diff --git a/composer.json b/composer.json
index 8475f27..5093f39 100755
--- a/composer.json
+++ b/composer.json
@@ -35,13 +35,14 @@
"amphp/serialization": "^1",
"amphp/socket": "^2",
"amphp/sync": "^2",
- "revolt/event-loop": "^1"
+ "revolt/event-loop": "^1",
+ "symfony/polyfill-php83": "^1.31"
},
"require-dev": {
"phpunit/phpunit": "^9",
"amphp/phpunit-util": "^3",
"amphp/php-cs-fixer-config": "^2",
- "psalm/phar": "^5.18"
+ "psalm/phar": "^6"
},
"autoload": {
"psr-4": {
diff --git a/examples/worker/FetchTask.php b/examples/worker/FetchTask.php
index 2e97945..2a31095 100644
--- a/examples/worker/FetchTask.php
+++ b/examples/worker/FetchTask.php
@@ -18,6 +18,7 @@ public function __construct(string $url)
$this->url = $url;
}
+ #[\Override]
public function run(Channel $channel, Cancellation $cancellation): mixed
{
return \file_get_contents($this->url);
diff --git a/examples/worker/parent.php b/examples/worker/parent.php
index 54c925b..1c50496 100644
--- a/examples/worker/parent.php
+++ b/examples/worker/parent.php
@@ -18,8 +18,8 @@
$execution2->getFuture(),
]);
-print strlen($bodies[0]) . PHP_EOL;
-print strlen($bodies[1]) . PHP_EOL;
+print ((string) strlen($bodies[0])) . PHP_EOL;
+print ((string) strlen($bodies[1])) . PHP_EOL;
print PHP_EOL;
print 'Took ' . (microtime(true) - $start) . ' seconds' . PHP_EOL;
diff --git a/psalm-baseline.xml b/psalm-baseline.xml
new file mode 100644
index 0000000..53c64f1
--- /dev/null
+++ b/psalm-baseline.xml
@@ -0,0 +1,92 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ url)]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/psalm.xml b/psalm.xml
index 8b919b4..30acb14 100644
--- a/psalm.xml
+++ b/psalm.xml
@@ -6,6 +6,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="https://getpsalm.org/schema/config"
xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
+ errorBaseline="psalm-baseline.xml"
>
@@ -23,12 +24,6 @@
-
-
-
-
-
-
diff --git a/src/Context/ContextException.php b/src/Context/ContextException.php
index bb5cab9..cf9de01 100644
--- a/src/Context/ContextException.php
+++ b/src/Context/ContextException.php
@@ -4,6 +4,6 @@
use Amp\Sync\ChannelException;
-class ContextException extends ChannelException
+final class ContextException extends ChannelException
{
}
diff --git a/src/Context/DefaultContextFactory.php b/src/Context/DefaultContextFactory.php
index f545fd1..8f79c82 100644
--- a/src/Context/DefaultContextFactory.php
+++ b/src/Context/DefaultContextFactory.php
@@ -34,6 +34,7 @@ public function __construct(IpcHub $ipcHub = new LocalIpcHub())
*
* @throws ContextException
*/
+ #[\Override]
public function start(string|array $script, ?Cancellation $cancellation = null): Context
{
$context = $this->contextFactory->start($script, $cancellation);
diff --git a/src/Context/Internal/AbstractContext.php b/src/Context/Internal/AbstractContext.php
index 9044154..ccfcccf 100644
--- a/src/Context/Internal/AbstractContext.php
+++ b/src/Context/Internal/AbstractContext.php
@@ -34,6 +34,7 @@ protected function __construct(
) {
}
+ #[\Override]
public function receive(?Cancellation $cancellation = null): mixed
{
try {
@@ -66,6 +67,7 @@ public function receive(?Cancellation $cancellation = null): mixed
return $data->getMessage();
}
+ #[\Override]
public function send(mixed $data): void
{
try {
@@ -80,16 +82,19 @@ public function send(mixed $data): void
}
}
+ #[\Override]
public function close(): void
{
$this->ipcChannel->close();
}
+ #[\Override]
public function isClosed(): bool
{
return $this->ipcChannel->isClosed();
}
+ #[\Override]
public function onClose(\Closure $onClose): void
{
$this->ipcChannel->onClose($onClose);
diff --git a/src/Context/Internal/ContextChannel.php b/src/Context/Internal/ContextChannel.php
index 7b981ac..9559a43 100644
--- a/src/Context/Internal/ContextChannel.php
+++ b/src/Context/Internal/ContextChannel.php
@@ -24,26 +24,31 @@ public function __construct(
) {
}
+ #[\Override]
public function send(mixed $data): void
{
$this->channel->send(new ContextMessage($data));
}
+ #[\Override]
public function receive(?Cancellation $cancellation = null): mixed
{
return $this->channel->receive($cancellation);
}
+ #[\Override]
public function close(): void
{
$this->channel->close();
}
+ #[\Override]
public function isClosed(): bool
{
return $this->channel->isClosed();
}
+ #[\Override]
public function onClose(\Closure $onClose): void
{
$this->channel->onClose($onClose);
diff --git a/src/Context/Internal/ExitFailure.php b/src/Context/Internal/ExitFailure.php
index 3ab404e..5f622ad 100644
--- a/src/Context/Internal/ExitFailure.php
+++ b/src/Context/Internal/ExitFailure.php
@@ -45,6 +45,7 @@ public function __construct(\Throwable $exception)
/**
* @throws ContextException
*/
+ #[\Override]
public function getResult(): never
{
$exception = $this->createException();
diff --git a/src/Context/Internal/ExitSuccess.php b/src/Context/Internal/ExitSuccess.php
index f1e114c..acbc335 100644
--- a/src/Context/Internal/ExitSuccess.php
+++ b/src/Context/Internal/ExitSuccess.php
@@ -20,6 +20,7 @@ public function __construct(
/**
* @return TValue
*/
+ #[\Override]
public function getResult(): mixed
{
return $this->result;
diff --git a/src/Context/Internal/functions.php b/src/Context/Internal/functions.php
index e0f130d..4deb9a6 100644
--- a/src/Context/Internal/functions.php
+++ b/src/Context/Internal/functions.php
@@ -5,22 +5,27 @@
use Amp\ByteStream\StreamChannel;
use Amp\Cancellation;
use Amp\Future;
-use Amp\Parallel\Ipc;
+use Amp\Parallel\Ipc\IpcHub;
use Amp\Serialization\SerializationException;
use Revolt\EventLoop;
-/** @internal */
-function runContext(string $uri, string $key, Cancellation $connectCancellation, array $argv): void
+/**
+ * @param class-string $hubClass
+ * @param non-empty-string $uri
+ * @param non-empty-string $key
+ * @internal
+ */
+function runContext(string $hubClass, string $uri, string $key, Cancellation $connectCancellation, array $argv): void
{
- EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation): void {
+ EventLoop::queue(function () use ($hubClass, $argv, $uri, $key, $connectCancellation): void {
/** @noinspection PhpUnusedLocalVariableInspection */
$argc = \count($argv);
try {
- $socket = Ipc\connect($uri, $key, $connectCancellation);
+ $socket = $hubClass::connect($uri, $key, $connectCancellation);
$ipcChannel = new StreamChannel($socket, $socket);
- $socket = Ipc\connect($uri, $key, $connectCancellation);
+ $socket = $hubClass::connect($uri, $key, $connectCancellation);
$resultChannel = new StreamChannel($socket, $socket);
} catch (\Throwable $exception) {
\trigger_error($exception->getMessage(), E_USER_ERROR);
diff --git a/src/Context/Internal/process-runner.php b/src/Context/Internal/process-runner.php
index d10a60f..147548a 100644
--- a/src/Context/Internal/process-runner.php
+++ b/src/Context/Internal/process-runner.php
@@ -5,6 +5,7 @@
use Amp\ByteStream;
use Amp\Parallel\Context\ProcessContext;
use Amp\Parallel\Ipc;
+use Amp\Parallel\Ipc\IpcHub;
use Amp\TimeoutCancellation;
use Revolt\EventLoop;
@@ -61,22 +62,30 @@
\trigger_error("No socket path provided", E_USER_ERROR);
}
- if (!isset($argv[2]) || !\is_numeric($argv[2])) {
- \trigger_error("No key length provided", E_USER_ERROR);
+ if (!isset($argv[2])) {
+ \trigger_error("No hub class provided", E_USER_ERROR);
}
if (!isset($argv[3]) || !\is_numeric($argv[3])) {
+ \trigger_error("No key length provided", E_USER_ERROR);
+ }
+
+ if (!isset($argv[4]) || !\is_numeric($argv[4])) {
\trigger_error("No timeout provided", E_USER_ERROR);
}
- [, $uri, $length, $timeout] = $argv;
+ [, $uri, $hubClass, $length, $timeout] = $argv;
$length = (int) $length;
$timeout = (int) $timeout;
\assert($length > 0 && $timeout > 0);
+ if (!isset(\class_implements($hubClass)[IpcHub::class])) {
+ throw new \Error("Passed hub class $hubClass does not implement IpcHub!");
+ }
+
// Remove script path, socket path, key length, and timeout from process arguments.
- $argv = \array_slice($argv, 4);
+ $argv = \array_slice($argv, 5);
try {
$cancellation = new TimeoutCancellation($timeout);
@@ -85,5 +94,5 @@
\trigger_error($exception->getMessage(), E_USER_ERROR);
}
- runContext($uri, $key, $cancellation, $argv);
+ runContext($hubClass, $uri, $key, $cancellation, $argv);
})();
diff --git a/src/Context/ProcessContext.php b/src/Context/ProcessContext.php
index c0c97ed..b0ef919 100644
--- a/src/Context/ProcessContext.php
+++ b/src/Context/ProcessContext.php
@@ -11,7 +11,11 @@
use Amp\Process\Process;
use Amp\Process\ProcessException;
+use function Amp\ByteStream\getStderr;
+use function Amp\ByteStream\getStdout;
+
/**
+ * @api
* @template-covariant TResult
* @template-covariant TReceive
* @template TSend
@@ -136,6 +140,7 @@ public static function start(
...(self::$options ??= self::buildOptions()),
$scriptPath,
$ipcHub->getUri(),
+ $ipcHub::class,
(string) \strlen($key),
(string) $childConnectTimeout,
...$script,
@@ -276,6 +281,7 @@ public function __destruct()
* @return TResult
* @throws ContextException
*/
+ #[\Override]
public function join(?Cancellation $cancellation = null): mixed
{
$data = $this->receiveExitResult($cancellation);
@@ -348,6 +354,7 @@ public function getStderr(): ReadableResourceStream
return $this->process->getStderr();
}
+ #[\Override]
public function close(): void
{
$this->process->kill();
diff --git a/src/Context/ProcessContextFactory.php b/src/Context/ProcessContextFactory.php
index 7f94e42..0389e51 100644
--- a/src/Context/ProcessContextFactory.php
+++ b/src/Context/ProcessContextFactory.php
@@ -37,6 +37,7 @@ public function __construct(
*
* @throws ContextException
*/
+ #[\Override]
public function start(string|array $script, ?Cancellation $cancellation = null): ProcessContext
{
return ProcessContext::start(
diff --git a/src/Context/StatusError.php b/src/Context/StatusError.php
index f5a28e0..40ac3c6 100644
--- a/src/Context/StatusError.php
+++ b/src/Context/StatusError.php
@@ -2,6 +2,7 @@
namespace Amp\Parallel\Context;
+/** @api */
class StatusError extends \Error
{
}
diff --git a/src/Context/ThreadContext.php b/src/Context/ThreadContext.php
index ef3544d..58bc850 100644
--- a/src/Context/ThreadContext.php
+++ b/src/Context/ThreadContext.php
@@ -89,6 +89,7 @@ public static function start(
$runtime = new Runtime(self::$autoloadPath);
$future = $runtime->run(function (
int $id,
+ string $hubClass,
string $uri,
string $key,
float $connectTimeout,
@@ -103,12 +104,14 @@ public static function start(
// Timer to give the chance for the PHP VM to be interrupted by Runtime::kill(), since system calls
// such as select() will not be interrupted.
}));
+ /** @var non-empty-string $uri */
+ /** @var non-empty-string $key */
- Internal\runContext($uri, $key, new TimeoutCancellation($connectTimeout), $argv);
+ Internal\runContext($hubClass, $uri, $key, new TimeoutCancellation($connectTimeout), $argv);
return 0;
// @codeCoverageIgnoreEnd
- }, [$id, $ipcHub->getUri(), $key, $childConnectTimeout, $script]);
+ }, [$id, $ipcHub::class, $ipcHub->getUri(), $key, $childConnectTimeout, $script]);
if (!$future) {
$runtime->kill();
@@ -154,6 +157,7 @@ private function __construct(
$this->oid = \getmypid();
}
+ #[\Override]
public function receive(?Cancellation $cancellation = null): mixed
{
if ($this->exited) {
@@ -163,6 +167,7 @@ public function receive(?Cancellation $cancellation = null): mixed
return parent::receive($cancellation);
}
+ #[\Override]
public function send(mixed $data): void
{
if ($this->exited) {
@@ -182,6 +187,7 @@ public function __destruct()
}
}
+ #[\Override]
public function close(): void
{
if (!$this->exited) {
@@ -197,6 +203,7 @@ public function close(): void
parent::close();
}
+ #[\Override]
public function join(?Cancellation $cancellation = null): mixed
{
$data = $this->receiveExitResult($cancellation);
diff --git a/src/Context/ThreadContextFactory.php b/src/Context/ThreadContextFactory.php
index f42f3b5..f6f52c7 100644
--- a/src/Context/ThreadContextFactory.php
+++ b/src/Context/ThreadContextFactory.php
@@ -24,6 +24,7 @@ public function __construct(
) {
}
+ #[\Override]
public function start(array|string $script, ?Cancellation $cancellation = null): ThreadContext
{
return ThreadContext::start($this->ipcHub, $script, $cancellation, $this->childConnectTimeout);
diff --git a/src/Ipc/IpcHub.php b/src/Ipc/IpcHub.php
index acc68be..b209ba8 100644
--- a/src/Ipc/IpcHub.php
+++ b/src/Ipc/IpcHub.php
@@ -2,16 +2,17 @@
namespace Amp\Parallel\Ipc;
+use Amp\ByteStream\ReadableStream;
+use Amp\ByteStream\WritableStream;
use Amp\Cancellation;
use Amp\Closable;
-use Amp\Socket\Socket;
interface IpcHub extends Closable
{
/**
* @param non-empty-string $key A key generated by {@see generateKey()}.
*/
- public function accept(string $key, ?Cancellation $cancellation = null): Socket;
+ public function accept(string $key, ?Cancellation $cancellation = null): ReadableStream&WritableStream;
/**
* @return non-empty-string URI to use with {@see connect()}.
@@ -22,4 +23,14 @@ public function getUri(): string;
* @return non-empty-string Pass the key returned from this method to the connecting context and {@see accept()}.
*/
public function generateKey(): string;
+
+ /**
+ * @param non-empty-string $uri
+ * @param non-empty-string $key A key generated by {@see generateKey()}.
+ */
+ public static function connect(
+ string $uri,
+ string $key,
+ ?Cancellation $cancellation = null,
+ ): ReadableStream&WritableStream;
}
diff --git a/src/Ipc/LocalIpcHub.php b/src/Ipc/LocalIpcHub.php
index 820cc71..d908f2c 100644
--- a/src/Ipc/LocalIpcHub.php
+++ b/src/Ipc/LocalIpcHub.php
@@ -7,7 +7,9 @@
use Amp\ForbidSerialization;
use Amp\Socket;
use Amp\Socket\ResourceSocket;
+use Amp\Socket\Socket as SocketSocket;
use Revolt\EventLoop;
+
use const Amp\Process\IS_WINDOWS;
final class LocalIpcHub implements IpcHub
@@ -47,22 +49,38 @@ public function __destruct()
$this->unlink();
}
+ /**
+ * Note that this is designed to be used in the child process/thread to connect to an IPC socket.
+ */
+ #[\Override]
+ public static function connect(
+ string $uri,
+ string $key,
+ ?Cancellation $cancellation = null,
+ ): SocketSocket {
+ return SocketIpcHub::connect($uri, $key, $cancellation);
+ }
+
+ #[\Override]
public function accept(string $key, ?Cancellation $cancellation = null): ResourceSocket
{
return $this->delegate->accept($key, $cancellation);
}
+ #[\Override]
public function isClosed(): bool
{
return $this->delegate->isClosed();
}
+ #[\Override]
public function close(): void
{
$this->delegate->close();
$this->unlink();
}
+ #[\Override]
public function onClose(\Closure $onClose): void
{
$this->delegate->onClose($onClose);
@@ -84,11 +102,13 @@ private function unlink(): void
}
}
+ #[\Override]
public function getUri(): string
{
return $this->delegate->getUri();
}
+ #[\Override]
public function generateKey(): string
{
return $this->delegate->generateKey();
diff --git a/src/Ipc/SocketIpcHub.php b/src/Ipc/SocketIpcHub.php
index 7d654ce..1448d39 100644
--- a/src/Ipc/SocketIpcHub.php
+++ b/src/Ipc/SocketIpcHub.php
@@ -10,10 +10,14 @@
use Amp\NullCancellation;
use Amp\Socket;
use Amp\Socket\ResourceSocket;
+use Amp\Socket\Socket as SocketSocket;
use Amp\Socket\SocketAddressType;
+use Amp\Socket\SocketConnector;
use Amp\TimeoutCancellation;
use Revolt\EventLoop;
+use function Amp\Socket\socketConnector;
+
final class SocketIpcHub implements IpcHub
{
use ForbidCloning;
@@ -94,16 +98,36 @@ public function __construct(
};
}
+ /**
+ * Note that this is designed to be used in the child process/thread to connect to an IPC socket.
+ */
+ #[\Override]
+ public static function connect(
+ string $uri,
+ string $key,
+ ?Cancellation $cancellation = null,
+ ?SocketConnector $connector = null,
+ ): SocketSocket {
+ $connector ??= socketConnector();
+
+ $client = $connector->connect($uri, cancellation: $cancellation);
+ $client->write($key);
+
+ return $client;
+ }
+
public function __destruct()
{
$this->close();
}
+ #[\Override]
public function isClosed(): bool
{
return $this->server->isClosed();
}
+ #[\Override]
public function close(): void
{
$this->server->close();
@@ -118,16 +142,19 @@ public function close(): void
}
}
+ #[\Override]
public function onClose(\Closure $onClose): void
{
$this->server->onClose($onClose);
}
+ #[\Override]
public function getUri(): string
{
return $this->uri;
}
+ #[\Override]
public function generateKey(): string
{
return \random_bytes($this->keyLength);
@@ -136,6 +163,7 @@ public function generateKey(): string
/**
* @param string $key A key generated by {@see generateKey()}.
*/
+ #[\Override]
public function accept(string $key, ?Cancellation $cancellation = null): ResourceSocket
{
if ($this->server->isClosed()) {
diff --git a/src/Ipc/functions.php b/src/Ipc/functions.php
index c14183d..c0a334a 100644
--- a/src/Ipc/functions.php
+++ b/src/Ipc/functions.php
@@ -2,17 +2,15 @@
namespace Amp\Parallel\Ipc;
-use Amp\ByteStream\ReadableResourceStream;
+use Amp\ByteStream\ReadableStream;
use Amp\Cancellation;
use Amp\Socket\Socket;
-use Amp\Socket\SocketConnector;
-use function Amp\Socket\socketConnector;
/**
* @param positive-int $keyLength
*/
function readKey(
- ReadableResourceStream|Socket $stream,
+ ReadableStream|Socket $stream,
?Cancellation $cancellation = null,
int $keyLength = SocketIpcHub::DEFAULT_KEY_LENGTH,
): string {
@@ -29,20 +27,3 @@ function readKey(
return $key;
}
-
-/**
- * Note that this is designed to be used in the child process/thread to connect to an IPC socket.
- */
-function connect(
- string $uri,
- string $key,
- ?Cancellation $cancellation = null,
- ?SocketConnector $connector = null,
-): Socket {
- $connector ??= socketConnector();
-
- $client = $connector->connect($uri, cancellation: $cancellation);
- $client->write($key);
-
- return $client;
-}
diff --git a/src/Worker/ContextWorkerFactory.php b/src/Worker/ContextWorkerFactory.php
index 927c497..9852055 100644
--- a/src/Worker/ContextWorkerFactory.php
+++ b/src/Worker/ContextWorkerFactory.php
@@ -32,6 +32,7 @@ public function __construct(
* The type of worker created depends on the extensions available. If multi-threading is enabled, a WorkerThread
* will be created. If threads are not available a WorkerProcess will be created.
*/
+ #[\Override]
public function create(?Cancellation $cancellation = null): Worker
{
$script = [self::SCRIPT_PATH];
diff --git a/src/Worker/ContextWorkerPool.php b/src/Worker/ContextWorkerPool.php
index e7f916b..6ba4e76 100644
--- a/src/Worker/ContextWorkerPool.php
+++ b/src/Worker/ContextWorkerPool.php
@@ -89,6 +89,7 @@ public function __destruct()
*
* @return bool True if the pool is running, otherwise false.
*/
+ #[\Override]
public function isRunning(): bool
{
return !$this->deferredCancellation->isCancelled();
@@ -99,11 +100,13 @@ public function isRunning(): bool
*
* @return bool True if the pool has at least one idle worker, otherwise false.
*/
+ #[\Override]
public function isIdle(): bool
{
return $this->idleWorkers->count() > 0 || $this->workers->count() < $this->limit;
}
+ #[\Override]
public function getWorkerLimit(): int
{
return $this->limit;
@@ -121,11 +124,13 @@ public function getLimit(): int
return $this->getWorkerLimit();
}
+ #[\Override]
public function getWorkerCount(): int
{
return $this->workers->count() + $this->pendingWorkerCount;
}
+ #[\Override]
public function getIdleWorkerCount(): int
{
return $this->idleWorkers->count();
@@ -134,6 +139,7 @@ public function getIdleWorkerCount(): int
/**
* Submits a {@see Task} to be executed by the worker pool.
*/
+ #[\Override]
public function submit(Task $task, ?Cancellation $cancellation = null): Execution
{
$worker = $this->pull();
@@ -156,6 +162,7 @@ public function submit(Task $task, ?Cancellation $cancellation = null): Executio
*
* @throws StatusError If the pool has not been started.
*/
+ #[\Override]
public function shutdown(): void
{
if ($this->exitStatus) {
@@ -182,6 +189,7 @@ public function shutdown(): void
/**
* Kills all workers in the pool and halts the worker pool.
*/
+ #[\Override]
public function kill(): void
{
$this->deferredCancellation->cancel();
@@ -211,6 +219,7 @@ private static function killWorkers(
}
}
+ #[\Override]
public function getWorker(): Worker
{
$worker = $this->pull();
diff --git a/src/Worker/DelegatingWorkerPool.php b/src/Worker/DelegatingWorkerPool.php
index f3822c3..8976bf8 100644
--- a/src/Worker/DelegatingWorkerPool.php
+++ b/src/Worker/DelegatingWorkerPool.php
@@ -8,6 +8,7 @@
use Amp\ForbidSerialization;
use Amp\Parallel\Worker\Internal\PooledWorker;
+/** @api */
final class DelegatingWorkerPool implements LimitedWorkerPool
{
use ForbidCloning;
@@ -29,16 +30,19 @@ public function __construct(private readonly WorkerPool $pool, private readonly
$this->waiting = new \SplQueue();
}
+ #[\Override]
public function isRunning(): bool
{
return $this->pool->isRunning();
}
+ #[\Override]
public function isIdle(): bool
{
return $this->pool->isIdle();
}
+ #[\Override]
public function submit(Task $task, ?Cancellation $cancellation = null): Execution
{
$worker = $this->selectWorker();
@@ -88,6 +92,7 @@ private function push(Worker $worker): void
}
}
+ #[\Override]
public function shutdown(): void
{
if (!$this->waiting->isEmpty()) {
@@ -98,6 +103,7 @@ public function shutdown(): void
$this->pool->shutdown();
}
+ #[\Override]
public function kill(): void
{
if (!$this->waiting->isEmpty()) {
@@ -116,22 +122,26 @@ private function clearWaiting(\Throwable $exception): void
}
}
+ #[\Override]
public function getWorker(): Worker
{
$worker = $this->selectWorker();
return new PooledWorker($worker, $this->push(...));
}
+ #[\Override]
public function getWorkerLimit(): int
{
return $this->limit;
}
+ #[\Override]
public function getWorkerCount(): int
{
return \min($this->limit, $this->pool->getWorkerCount());
}
+ #[\Override]
public function getIdleWorkerCount(): int
{
return \min($this->limit, $this->pool->getIdleWorkerCount());
diff --git a/src/Worker/Internal/ContextWorker.php b/src/Worker/Internal/ContextWorker.php
index a399cac..fca6e27 100644
--- a/src/Worker/Internal/ContextWorker.php
+++ b/src/Worker/Internal/ContextWorker.php
@@ -54,7 +54,7 @@ public function __construct(private readonly Context $context)
{
$jobQueue = &$this->jobQueue;
$queues = &$this->queues;
- /** @psalm-suppress UndefinedVariable $onReceive is defined here. */
+ /** @psalm-suppress UndefinedVariable, UnusedVariable $onReceive is defined here. */
$this->onReceive = $onReceive = static function (
?\Throwable $exception,
?Internal\JobPacket $data
@@ -125,17 +125,20 @@ private static function receive(Context $context, callable $onReceive): void
});
}
+ #[\Override]
public function isRunning(): bool
{
// Report as running unless shutdown or killed.
return $this->exitStatus === null;
}
+ #[\Override]
public function isIdle(): bool
{
return empty($this->jobQueue);
}
+ #[\Override]
public function submit(Task $task, ?Cancellation $cancellation = null): Execution
{
if ($this->exitStatus) {
@@ -196,6 +199,7 @@ public function submit(Task $task, ?Cancellation $cancellation = null): Executio
return new Execution($task, $channel, $future);
}
+ #[\Override]
public function shutdown(): void
{
if ($this->exitStatus) {
@@ -221,6 +225,7 @@ public function shutdown(): void
}))->await();
}
+ #[\Override]
public function kill(): void
{
if (!$this->context->isClosed()) {
diff --git a/src/Worker/Internal/JobChannel.php b/src/Worker/Internal/JobChannel.php
index a3b1bac..2e11675 100644
--- a/src/Worker/Internal/JobChannel.php
+++ b/src/Worker/Internal/JobChannel.php
@@ -37,6 +37,7 @@ public function __destruct()
$this->close();
}
+ #[\Override]
public function send(mixed $data): void
{
if ($this->onClose->isComplete()) {
@@ -46,6 +47,7 @@ public function send(mixed $data): void
$this->channel->send(new JobMessage($this->id, $data));
}
+ #[\Override]
public function receive(?Cancellation $cancellation = null): mixed
{
if (!$this->iterator->continue($cancellation)) {
@@ -56,6 +58,7 @@ public function receive(?Cancellation $cancellation = null): mixed
return $this->iterator->getValue();
}
+ #[\Override]
public function close(): void
{
$this->iterator->dispose();
@@ -65,11 +68,13 @@ public function close(): void
}
}
+ #[\Override]
public function isClosed(): bool
{
return $this->channel->isClosed() || $this->onClose->isComplete();
}
+ #[\Override]
public function onClose(\Closure $onClose): void
{
$this->onClose->getFuture()->finally($onClose);
diff --git a/src/Worker/Internal/PooledWorker.php b/src/Worker/Internal/PooledWorker.php
index 060b7d0..50c4e53 100644
--- a/src/Worker/Internal/PooledWorker.php
+++ b/src/Worker/Internal/PooledWorker.php
@@ -32,16 +32,19 @@ public function __destruct()
($this->push)($this->worker);
}
+ #[\Override]
public function isRunning(): bool
{
return $this->worker->isRunning();
}
+ #[\Override]
public function isIdle(): bool
{
return $this->worker->isIdle();
}
+ #[\Override]
public function submit(Task $task, ?Cancellation $cancellation = null): Execution
{
$job = $this->worker->submit($task, $cancellation);
@@ -52,11 +55,13 @@ public function submit(Task $task, ?Cancellation $cancellation = null): Executio
return $job;
}
+ #[\Override]
public function shutdown(): void
{
$this->worker->shutdown();
}
+ #[\Override]
public function kill(): void
{
$this->worker->kill();
diff --git a/src/Worker/Internal/TaskCancelled.php b/src/Worker/Internal/TaskCancelled.php
index 19772cc..b89ddcc 100644
--- a/src/Worker/Internal/TaskCancelled.php
+++ b/src/Worker/Internal/TaskCancelled.php
@@ -16,6 +16,7 @@ public function __construct(string $id, CancelledException $exception)
/**
* @throws TaskCancelledException
*/
+ #[\Override]
public function getResult(): never
{
throw new TaskCancelledException($this->createException());
diff --git a/src/Worker/Internal/TaskFailure.php b/src/Worker/Internal/TaskFailure.php
index dcff985..b4c66aa 100644
--- a/src/Worker/Internal/TaskFailure.php
+++ b/src/Worker/Internal/TaskFailure.php
@@ -48,6 +48,7 @@ public function __construct(string $id, \Throwable $exception)
/**
* @throws TaskFailureThrowable
*/
+ #[\Override]
public function getResult(): never
{
throw $this->createException();
diff --git a/src/Worker/Internal/TaskSuccess.php b/src/Worker/Internal/TaskSuccess.php
index cc0d382..e4b55a7 100644
--- a/src/Worker/Internal/TaskSuccess.php
+++ b/src/Worker/Internal/TaskSuccess.php
@@ -25,6 +25,7 @@ public function __construct(
/**
* @return T
*/
+ #[\Override]
public function getResult(): mixed
{
if ($this->result instanceof \__PHP_Incomplete_Class) {
diff --git a/src/Worker/TaskCancelledException.php b/src/Worker/TaskCancelledException.php
index e0e8e22..965f83c 100644
--- a/src/Worker/TaskCancelledException.php
+++ b/src/Worker/TaskCancelledException.php
@@ -14,36 +14,43 @@ public function __construct(TaskFailureThrowable $exception)
$this->failure = $exception;
}
+ #[\Override]
public function getOriginalClassName(): string
{
return $this->failure->getOriginalClassName();
}
+ #[\Override]
public function getOriginalMessage(): string
{
return $this->failure->getOriginalMessage();
}
+ #[\Override]
public function getOriginalCode(): string|int
{
return $this->failure->getOriginalCode();
}
+ #[\Override]
public function getOriginalFile(): string
{
return $this->failure->getOriginalFile();
}
+ #[\Override]
public function getOriginalLine(): int
{
return $this->failure->getOriginalLine();
}
+ #[\Override]
public function getOriginalTrace(): array
{
return $this->failure->getOriginalTrace();
}
+ #[\Override]
public function getOriginalTraceAsString(): string
{
return $this->failure->getOriginalTraceAsString();
diff --git a/src/Worker/WorkerException.php b/src/Worker/WorkerException.php
index bcb4bb8..35ec1c9 100644
--- a/src/Worker/WorkerException.php
+++ b/src/Worker/WorkerException.php
@@ -2,6 +2,7 @@
namespace Amp\Parallel\Worker;
+/** @api */
class WorkerException extends \Exception
{
}