44
55use Amp \Cache \LocalCache ;
66use Amp \Cancellation ;
7- use Amp \CancelledException ;
7+ use Amp \DeferredFuture ;
88use Amp \ForbidCloning ;
99use Amp \ForbidSerialization ;
1010use Amp \NullCancellation ;
@@ -25,14 +25,15 @@ final class SocketIpcHub implements IpcHub
2525 /** @var non-empty-string */
2626 private readonly string $ uri ;
2727
28- /** @var array<string, EventLoop\Suspension > */
28+ /** @var array<string, DeferredFuture > */
2929 private array $ waitingByKey = [];
3030
3131 /** @var \Closure(): void */
3232 private readonly \Closure $ accept ;
3333
3434 private bool $ queued = false ;
3535
36+ /** @var LocalCache<ResourceSocket> */
3637 private LocalCache $ clientsByKey ;
3738
3839 /**
@@ -68,8 +69,8 @@ public function __construct(
6869 if (!$ client ) {
6970 $ queued = false ;
7071 $ exception = new Socket \SocketException ('IPC socket closed before the client connected ' );
71- foreach ($ waitingByKey as $ suspension ) {
72- $ suspension -> throw ($ exception );
72+ foreach ($ waitingByKey as $ deferred ) {
73+ $ deferred -> error ($ exception );
7374 }
7475 return ;
7576 }
@@ -82,7 +83,7 @@ public function __construct(
8283 }
8384
8485 if (isset ($ waitingByKey [$ received ])) {
85- $ waitingByKey [$ received ]->resume ($ client );
86+ $ waitingByKey [$ received ]->complete ($ client );
8687 unset($ waitingByKey [$ received ]);
8788 } else {
8889 $ clientsByKey ->set ($ received , $ client );
@@ -112,8 +113,8 @@ public function close(): void
112113 }
113114
114115 $ exception = new Socket \SocketException ('IPC socket closed before the client connected ' );
115- foreach ($ this ->waitingByKey as $ suspension ) {
116- $ suspension -> throw ($ exception );
116+ foreach ($ this ->waitingByKey as $ deferred ) {
117+ $ deferred -> error ($ exception );
117118 }
118119 }
119120
@@ -165,19 +166,19 @@ public function accept(string $key, ?Cancellation $cancellation = null): Resourc
165166 $ this ->queued = true ;
166167 }
167168
168- $ cancellation = $ cancellation ?? new NullCancellation ();
169- $ cancellation ->throwIfRequested ();
169+ $ cancellation ??= new NullCancellation ();
170170
171- $ this ->waitingByKey [$ key ] = $ suspension = EventLoop::getSuspension ();
172- $ cancellationId = $ cancellation ->subscribe (function (CancelledException $ exception ) use ($ suspension ) {
173- $ suspension ->throw ($ exception );
171+ $ this ->waitingByKey [$ key ] = $ deferred = new DeferredFuture ();
172+
173+ $ waitingByKey = &$ this ->waitingByKey ;
174+ $ cancellationId = $ cancellation ->subscribe (static function () use (&$ waitingByKey , $ key ): void {
175+ unset($ waitingByKey [$ key ]);
174176 });
175177
176178 try {
177- $ client = $ suspension -> suspend ( );
179+ $ client = $ deferred -> getFuture ()-> await ( $ cancellation );
178180 } finally {
179181 $ cancellation ->unsubscribe ($ cancellationId );
180- unset($ this ->waitingByKey [$ key ]);
181182 }
182183
183184 return $ client ;
0 commit comments