Skip to content

Commit f91c44b

Browse files
committed
Allow calling Context::join() immediately
1 parent 0aa54e1 commit f91c44b

File tree

8 files changed

+102
-89
lines changed

8 files changed

+102
-89
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
"amphp/serialization": "^1",
3535
"amphp/socket": "^2",
3636
"amphp/sync": "^2",
37-
"revolt/event-loop": "^1 || ^0.2.1"
37+
"revolt/event-loop": "^1"
3838
},
3939
"require-dev": {
4040
"phpunit/phpunit": "^9",

src/Context/Context.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
interface Context extends Channel
1515
{
1616
/**
17-
* @return TResult The data returned from the context.
17+
* @return TResult The data returned from the context. This method may be called at any time to await the result or
18+
* an exception will be thrown if the context is closed or throws an exception or exits with a non-zero code.
1819
*
1920
* @throws ContextException If the context exited with an uncaught exception or non-zero code.
2021
*/

src/Context/Internal/AbstractContext.php

Lines changed: 45 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
namespace Amp\Parallel\Context\Internal;
44

55
use Amp\Cancellation;
6-
use Amp\CancelledException;
76
use Amp\ForbidCloning;
87
use Amp\ForbidSerialization;
8+
use Amp\Future;
99
use Amp\Parallel\Context\Context;
1010
use Amp\Parallel\Context\ContextException;
1111
use Amp\Sync\Channel;
1212
use Amp\Sync\ChannelException;
13-
use Amp\TimeoutCancellation;
13+
use function Amp\async;
1414
use function Amp\Parallel\Context\flattenArgument;
1515

1616
/**
@@ -24,32 +24,47 @@ abstract class AbstractContext implements Context
2424
use ForbidCloning;
2525
use ForbidSerialization;
2626

27+
/** @var Future<ExitResult> */
28+
private readonly Future $result;
29+
2730
protected function __construct(
28-
private readonly Channel $channel,
31+
private readonly Channel $ipcChannel,
32+
private readonly Channel $resultChannel,
2933
) {
34+
$this->result = async(static function () use ($resultChannel, $ipcChannel): ExitResult {
35+
try {
36+
$data = $resultChannel->receive();
37+
} catch (\Throwable $exception) {
38+
throw new ContextException("Failed to receive result from context", previous: $exception);
39+
} finally {
40+
$resultChannel->close();
41+
$ipcChannel->close();
42+
}
43+
44+
if (!$data instanceof ExitResult) {
45+
throw new ContextException(\sprintf(
46+
"The context sent data instead of exiting: %s",
47+
flattenArgument($data),
48+
));
49+
}
50+
51+
return $data;
52+
});
53+
54+
$this->result->ignore();
3055
}
3156

3257
public function receive(?Cancellation $cancellation = null): mixed
3358
{
3459
try {
35-
$data = $this->channel->receive($cancellation);
60+
$data = $this->ipcChannel->receive($cancellation);
3661
} catch (ChannelException $exception) {
37-
try {
38-
$data = $this->join(new TimeoutCancellation(0.1));
39-
} catch (ChannelException|CancelledException) {
40-
if (!$this->isClosed()) {
41-
$this->close();
42-
}
43-
throw new ContextException(
44-
"The context stopped responding, potentially due to a fatal error or calling exit",
45-
previous: $exception,
46-
);
47-
}
62+
$this->ipcChannel->close();
4863

49-
throw new ContextException(\sprintf(
50-
'Context unexpectedly exited when waiting to receive data with result: %s',
51-
flattenArgument($data),
52-
), previous: $exception);
64+
throw new ContextException(
65+
"The context stopped responding, potentially due to a fatal error or calling exit",
66+
previous: $exception,
67+
);
5368
}
5469

5570
if (!$data instanceof ContextMessage) {
@@ -74,77 +89,35 @@ public function receive(?Cancellation $cancellation = null): mixed
7489
public function send(mixed $data): void
7590
{
7691
try {
77-
$this->channel->send($data);
92+
$this->ipcChannel->send($data);
7893
} catch (ChannelException $exception) {
79-
try {
80-
$data = $this->join(new TimeoutCancellation(0.1));
81-
} catch (ChannelException|CancelledException) {
82-
if (!$this->isClosed()) {
83-
$this->close();
84-
}
85-
86-
throw new ContextException(
87-
"The context stopped responding, potentially due to a fatal error or calling exit",
88-
previous: $exception,
89-
);
90-
}
94+
$this->ipcChannel->close();
9195

92-
throw new ContextException(\sprintf(
93-
'Context unexpectedly exited when sending data with result: %s',
94-
flattenArgument($data),
95-
), 0, $exception);
96+
throw new ContextException(
97+
"The context stopped responding, potentially due to a fatal error or calling exit",
98+
previous: $exception,
99+
);
96100
}
97101
}
98102

99103
public function close(): void
100104
{
101-
$this->channel->close();
105+
$this->ipcChannel->close();
106+
$this->resultChannel->close();
102107
}
103108

104109
public function isClosed(): bool
105110
{
106-
return $this->channel->isClosed();
111+
return $this->resultChannel->isClosed();
107112
}
108113

109114
public function onClose(\Closure $onClose): void
110115
{
111-
$this->channel->onClose($onClose);
116+
$this->resultChannel->onClose($onClose);
112117
}
113118

114119
protected function receiveExitResult(?Cancellation $cancellation = null): ExitResult
115120
{
116-
if ($this->channel->isClosed()) {
117-
throw new ContextException("The context has already closed without providing a result");
118-
}
119-
120-
try {
121-
$data = $this->channel->receive($cancellation);
122-
} catch (CancelledException $exception) {
123-
throw $exception;
124-
} catch (\Throwable $exception) {
125-
if (!$this->isClosed()) {
126-
$this->close();
127-
}
128-
throw new ContextException("Failed to receive result from context", previous: $exception);
129-
}
130-
131-
if (!$data instanceof ExitResult) {
132-
if (!$this->isClosed()) {
133-
$this->close();
134-
}
135-
136-
if ($data instanceof ContextMessage) {
137-
$data = $data->getMessage();
138-
}
139-
140-
throw new ContextException(\sprintf(
141-
"The context sent data instead of exiting: %s",
142-
flattenArgument($data),
143-
));
144-
}
145-
146-
$this->channel->close();
147-
148-
return $data;
121+
return $this->result->await($cancellation);
149122
}
150123
}

src/Context/Internal/functions.php

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ function runContext(string $uri, string $key, Cancellation $connectCancellation,
1818

1919
try {
2020
$socket = Ipc\connect($uri, $key, $connectCancellation);
21-
$channel = new StreamChannel($socket, $socket);
21+
$ipcChannel = new StreamChannel($socket, $socket);
22+
23+
$socket = Ipc\connect($uri, $key, $connectCancellation);
24+
$resultChannel = new StreamChannel($socket, $socket);
2225
} catch (\Throwable $exception) {
2326
\trigger_error($exception->getMessage(), E_USER_ERROR);
2427
}
@@ -56,18 +59,18 @@ function runContext(string $uri, string $key, Cancellation $connectCancellation,
5659
), 0, $exception);
5760
}
5861

59-
$returnValue = $callable(new ContextChannel($channel));
62+
$returnValue = $callable(new ContextChannel($ipcChannel));
6063
$result = new ExitSuccess($returnValue instanceof Future ? $returnValue->await() : $returnValue);
6164
} catch (\Throwable $exception) {
6265
$result = new ExitFailure($exception);
6366
}
6467

6568
try {
6669
try {
67-
$channel->send($result);
70+
$resultChannel->send($result);
6871
} catch (SerializationException $exception) {
6972
// Serializing the result failed. Send the reason why.
70-
$channel->send(new ExitFailure($exception));
73+
$resultChannel->send(new ExitFailure($exception));
7174
}
7275
} catch (\Throwable $exception) {
7376
\trigger_error(\sprintf(

src/Context/ProcessContext.php

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,10 @@ public static function start(
159159
$process->getStdin()->write($key);
160160

161161
$socket = $ipcHub->accept($key, $cancellation);
162-
$channel = new StreamChannel($socket, $socket);
162+
$ipcChannel = new StreamChannel($socket, $socket);
163+
164+
$socket = $ipcHub->accept($key, $cancellation);
165+
$resultChannel = new StreamChannel($socket, $socket);
163166
} catch (\Throwable $exception) {
164167
if ($process->isRunning()) {
165168
$process->kill();
@@ -170,7 +173,7 @@ public static function start(
170173
throw new ContextException("Starting the process failed", 0, $exception);
171174
}
172175

173-
return new self($process, $channel);
176+
return new self($process, $ipcChannel, $resultChannel);
174177
}
175178

176179
/**
@@ -245,13 +248,14 @@ public static function getIgnoredSignals(): array
245248
}
246249

247250
/**
248-
* @param StreamChannel<TReceive, TSend> $channel
251+
* @param StreamChannel<TReceive, TSend> $ipcChannel
249252
*/
250253
private function __construct(
251254
private readonly Process $process,
252-
StreamChannel $channel,
255+
StreamChannel $ipcChannel,
256+
StreamChannel $resultChannel,
253257
) {
254-
parent::__construct($channel);
258+
parent::__construct($ipcChannel, $resultChannel);
255259
}
256260

257261
/**

src/Context/ThreadContext.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ public static function start(
117117

118118
try {
119119
$socket = $ipcHub->accept($key, $cancellation);
120-
$channel = new StreamChannel($socket, $socket);
120+
$ipcChannel = new StreamChannel($socket, $socket);
121+
122+
$socket = $ipcHub->accept($key, $cancellation);
123+
$resultChannel = new StreamChannel($socket, $socket);
121124
} catch (\Throwable $exception) {
122125
$runtime->kill();
123126

@@ -126,7 +129,7 @@ public static function start(
126129
throw new ContextException("Starting the runtime failed", 0, $exception);
127130
}
128131

129-
return new self($id, $runtime, $future, $hub, $channel);
132+
return new self($id, $runtime, $future, $hub, $ipcChannel, $resultChannel);
130133
}
131134

132135
private readonly int $oid;
@@ -138,9 +141,10 @@ private function __construct(
138141
private readonly Runtime $runtime,
139142
ParallelFuture $future,
140143
private readonly ParallelHub $hub,
141-
StreamChannel $channel,
144+
StreamChannel $ipcChannel,
145+
StreamChannel $resultChannel,
142146
) {
143-
parent::__construct($channel);
147+
parent::__construct($ipcChannel, $resultChannel);
144148

145149
$exited = &$this->exited;
146150
$this->hub->add($this->id, $future)->finally(static function () use (&$exited): void {

test/Context/AbstractContextTest.php

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public function testFailingProcess(): void
3535
public function testThrowingProcessOnReceive(): void
3636
{
3737
$this->expectException(ContextException::class);
38-
$this->expectExceptionMessage('Test message');
38+
$this->expectExceptionMessage('The context stopped responding');
3939

4040
$context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php");
4141

@@ -102,7 +102,7 @@ public function testCloseWhenJoining(): void
102102
$this->setTimeout(3);
103103

104104
$this->expectException(ContextException::class);
105-
$this->expectExceptionMessage('The context has already closed');
105+
$this->expectExceptionMessage('Failed to receive result from context');
106106

107107
$context = $this->createContext([
108108
__DIR__ . "/Fixtures/delayed-process.php",
@@ -172,4 +172,23 @@ public function testCancelJoin(): void
172172

173173
self::assertSame(1, $context->join(new TimeoutCancellation(1)));
174174
}
175+
176+
public function testImmediateJoin(): void
177+
{
178+
$context = $this->createContext([
179+
__DIR__ . "/Fixtures/channel-process.php",
180+
1,
181+
]);
182+
183+
$cancellation = new TimeoutCancellation(1);
184+
185+
$exitValue = async($context->join(...));
186+
$receivedValue = async($context->receive(...));
187+
188+
$value = 'value';
189+
$context->send($value);
190+
191+
self::assertSame($value, $receivedValue->await($cancellation));
192+
self::assertSame($value, $exitValue->await($cancellation));
193+
}
175194
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php declare(strict_types=1);
2+
3+
use Amp\Sync\Channel;
4+
5+
return function (Channel $channel) use ($argv): string {
6+
$value = $channel->receive();
7+
$channel->send($value);
8+
return $value;
9+
};

0 commit comments

Comments
 (0)