|
4 | 4 |
|
5 | 5 | use Amp\CancellationToken; |
6 | 6 | use Amp\CancelledException; |
7 | | -use Amp\Deferred; |
8 | 7 | use Amp\Http; |
9 | 8 | use Amp\Http\Rfc7230; |
10 | 9 | use Amp\Http\Status; |
|
18 | 17 | use Amp\Websocket\Rfc6455Client; |
19 | 18 | use Amp\Websocket\Rfc7692CompressionFactory; |
20 | 19 | use League\Uri; |
21 | | -use function Amp\asyncCall; |
22 | 20 | use function Amp\call; |
23 | 21 |
|
24 | 22 | class Rfc6455Connector implements Connector |
@@ -78,48 +76,39 @@ public function connect( |
78 | 76 | throw new ConnectionException('Connecting to the websocket failed', 0, $exception); |
79 | 77 | } |
80 | 78 |
|
81 | | - $deferred = new Deferred; |
82 | | - $id = $cancellationToken->subscribe([$deferred, 'fail']); |
| 79 | + $id = $cancellationToken->subscribe([$socket, 'close']); |
83 | 80 |
|
84 | | - asyncCall(function () use ($socket, $handshake, $deferred) { |
85 | | - try { |
86 | | - $key = Websocket\generateKey(); |
87 | | - yield $socket->write($this->generateRequest($handshake, $key)); |
| 81 | + try { |
| 82 | + $key = Websocket\generateKey(); |
| 83 | + yield $socket->write($this->generateRequest($handshake, $key)); |
88 | 84 |
|
89 | | - $buffer = ''; |
| 85 | + $buffer = ''; |
90 | 86 |
|
91 | | - while (($chunk = yield $socket->read()) !== null) { |
92 | | - $buffer .= $chunk; |
| 87 | + while (($chunk = yield $socket->read()) !== null) { |
| 88 | + $buffer .= $chunk; |
93 | 89 |
|
94 | | - if ($position = \strpos($buffer, "\r\n\r\n")) { |
95 | | - $headerBuffer = \substr($buffer, 0, $position + 4); |
96 | | - $buffer = \substr($buffer, $position + 4); |
| 90 | + if ($position = \strpos($buffer, "\r\n\r\n")) { |
| 91 | + $headerBuffer = \substr($buffer, 0, $position + 4); |
| 92 | + $buffer = \substr($buffer, $position + 4); |
97 | 93 |
|
98 | | - $headers = $this->handleResponse($headerBuffer, $key); |
| 94 | + $headers = $this->handleResponse($headerBuffer, $key); |
99 | 95 |
|
100 | | - if ($buffer !== '') { |
101 | | - $socket = new ClientSocket($socket, $buffer); |
102 | | - } |
| 96 | + $socket = new ClientSocket($socket, $buffer); |
103 | 97 |
|
104 | | - $deferred->resolve($this->createConnection($socket, $handshake->getOptions(), $headers)); |
105 | | - return; |
106 | | - } |
| 98 | + return $this->createConnection($socket, $handshake->getOptions(), $headers); |
107 | 99 | } |
108 | | - } catch (ConnectionException $exception) { |
109 | | - $deferred->fail($exception); |
110 | | - } catch (\Throwable $exception) { |
111 | | - $deferred->fail(new ConnectionException('Performing the websocket handshake failed', 0, $exception)); |
112 | 100 | } |
113 | | - }); |
114 | | - |
115 | | - try { |
116 | | - return yield $deferred->promise(); |
117 | | - } catch (\Throwable $exception) { |
118 | | - $socket->close(); // Close socket in case operation did not fail but was cancelled. |
| 101 | + } catch (ConnectionException $exception) { |
119 | 102 | throw $exception; |
| 103 | + } catch (\Throwable $exception) { |
| 104 | + throw new ConnectionException('The websocket handshake failed', 0, $exception); |
120 | 105 | } finally { |
121 | 106 | $cancellationToken->unsubscribe($id); |
122 | 107 | } |
| 108 | + |
| 109 | + $cancellationToken->throwIfRequested(); // Connection may have closed due to cancellation request. |
| 110 | + |
| 111 | + throw new ConnectionException('The socket closed without a response'); |
123 | 112 | }); |
124 | 113 | } |
125 | 114 |
|
|
0 commit comments