Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
"amphp/serialization": "^1",
"amphp/socket": "^2",
"amphp/sync": "^2",
"revolt/event-loop": "^1"
"revolt/event-loop": "^1",
"symfony/polyfill-php83": "^1.31"
},
"require-dev": {
"phpunit/phpunit": "^9",
"amphp/phpunit-util": "^3",
"amphp/php-cs-fixer-config": "^2",
"psalm/phar": "^5.18"
"psalm/phar": "^6"
},
"autoload": {
"psr-4": {
Expand Down
1 change: 1 addition & 0 deletions examples/worker/FetchTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public function __construct(string $url)
$this->url = $url;
}

#[\Override]
public function run(Channel $channel, Cancellation $cancellation): mixed
{
return \file_get_contents($this->url);
Expand Down
4 changes: 2 additions & 2 deletions examples/worker/parent.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
$execution2->getFuture(),
]);

print strlen($bodies[0]) . PHP_EOL;
print strlen($bodies[1]) . PHP_EOL;
print ((string) strlen($bodies[0])) . PHP_EOL;
print ((string) strlen($bodies[1])) . PHP_EOL;

print PHP_EOL;
print 'Took ' . (microtime(true) - $start) . ' seconds' . PHP_EOL;
92 changes: 92 additions & 0 deletions psalm-baseline.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="6.8.9@e856423a5dc38d65e56b5792750c557277afe393">
<file src="examples/process.php">
<UnusedVariable>
<code><![CDATA[$future]]></code>
</UnusedVariable>
</file>
<file src="examples/shared-memory-parcel.php">
<PossiblyFalseArgument>
<code><![CDATA[getmypid()]]></code>
</PossiblyFalseArgument>
</file>
<file src="examples/worker/FetchTask.php">
<FalsableReturnStatement>
<code><![CDATA[\file_get_contents($this->url)]]></code>
</FalsableReturnStatement>
<InvalidFalsableReturnType>
<code><![CDATA[mixed]]></code>
</InvalidFalsableReturnType>
</file>
<file src="examples/worker/parent.php">
<InvalidOperand>
<code><![CDATA[microtime(true) - $start]]></code>
</InvalidOperand>
</file>
<file src="src/Context/Internal/ContextException.php">
<PossiblyUnusedMethod>
<code><![CDATA[getOriginalClassName]]></code>
<code><![CDATA[getOriginalCode]]></code>
<code><![CDATA[getOriginalFile]]></code>
<code><![CDATA[getOriginalLine]]></code>
<code><![CDATA[getOriginalMessage]]></code>
<code><![CDATA[getOriginalTrace]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/Context/Internal/functions.php">
<UnusedVariable>
<code><![CDATA[$argc]]></code>
</UnusedVariable>
</file>
<file src="src/Context/Internal/process-runner.php">
<ArgumentTypeCoercion>
<code><![CDATA[$hubClass]]></code>
<code><![CDATA[$key]]></code>
<code><![CDATA[$uri]]></code>
</ArgumentTypeCoercion>
<UnusedVariable>
<code><![CDATA[$argc]]></code>
</UnusedVariable>
</file>
<file src="src/Context/ProcessContext.php">
<PossiblyFalseArgument>
<code><![CDATA[$contents]]></code>
</PossiblyFalseArgument>
</file>
<file src="src/Context/ThreadContext.php">
<ArgumentTypeCoercion>
<code><![CDATA[$hubClass]]></code>
<code><![CDATA[$uri]]></code>
</ArgumentTypeCoercion>
<PossiblyFalsePropertyAssignmentValue>
<code><![CDATA[\getmypid()]]></code>
</PossiblyFalsePropertyAssignmentValue>
</file>
<file src="src/Ipc/functions.php">
<TooManyArguments>
<code><![CDATA[read]]></code>
</TooManyArguments>
</file>
<file src="src/Worker/ContextWorkerPool.php">
<PossiblyUnusedMethod>
<code><![CDATA[getLimit]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/Worker/Execution.php">
<PossiblyUnusedMethod>
<code><![CDATA[await]]></code>
<code><![CDATA[getChannel]]></code>
<code><![CDATA[getTask]]></code>
</PossiblyUnusedMethod>
</file>
<file src="src/Worker/Internal/ContextWorker.php">
<UnusedVariable>
<code><![CDATA[$onReceive]]></code>
</UnusedVariable>
</file>
<file src="src/Worker/Internal/task-runner.php">
<UnusedVariable>
<code><![CDATA[$argc]]></code>
</UnusedVariable>
</file>
</files>
7 changes: 1 addition & 6 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="https://getpsalm.org/schema/config"
xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
errorBaseline="psalm-baseline.xml"
>
<projectFiles>
<directory name="examples"/>
Expand All @@ -23,12 +24,6 @@
</errorLevel>
</StringIncrement>

<MissingClosureReturnType>
<errorLevel type="suppress">
<directory name="src"/>
</errorLevel>
</MissingClosureReturnType>

<DocblockTypeContradiction>
<errorLevel type="suppress">
<directory name="src"/>
Expand Down
2 changes: 1 addition & 1 deletion src/Context/ContextException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

use Amp\Sync\ChannelException;

class ContextException extends ChannelException
final class ContextException extends ChannelException
{
}
1 change: 1 addition & 0 deletions src/Context/DefaultContextFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public function __construct(IpcHub $ipcHub = new LocalIpcHub())
*
* @throws ContextException
*/
#[\Override]
public function start(string|array $script, ?Cancellation $cancellation = null): Context
{
$context = $this->contextFactory->start($script, $cancellation);
Expand Down
5 changes: 5 additions & 0 deletions src/Context/Internal/AbstractContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ protected function __construct(
) {
}

#[\Override]
public function receive(?Cancellation $cancellation = null): mixed
{
try {
Expand Down Expand Up @@ -66,6 +67,7 @@ public function receive(?Cancellation $cancellation = null): mixed
return $data->getMessage();
}

#[\Override]
public function send(mixed $data): void
{
try {
Expand All @@ -80,16 +82,19 @@ public function send(mixed $data): void
}
}

#[\Override]
public function close(): void
{
$this->ipcChannel->close();
}

#[\Override]
public function isClosed(): bool
{
return $this->ipcChannel->isClosed();
}

#[\Override]
public function onClose(\Closure $onClose): void
{
$this->ipcChannel->onClose($onClose);
Expand Down
5 changes: 5 additions & 0 deletions src/Context/Internal/ContextChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,31 @@ public function __construct(
) {
}

#[\Override]
public function send(mixed $data): void
{
$this->channel->send(new ContextMessage($data));
}

#[\Override]
public function receive(?Cancellation $cancellation = null): mixed
{
return $this->channel->receive($cancellation);
}

#[\Override]
public function close(): void
{
$this->channel->close();
}

#[\Override]
public function isClosed(): bool
{
return $this->channel->isClosed();
}

#[\Override]
public function onClose(\Closure $onClose): void
{
$this->channel->onClose($onClose);
Expand Down
1 change: 1 addition & 0 deletions src/Context/Internal/ExitFailure.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public function __construct(\Throwable $exception)
/**
* @throws ContextException
*/
#[\Override]
public function getResult(): never
{
$exception = $this->createException();
Expand Down
1 change: 1 addition & 0 deletions src/Context/Internal/ExitSuccess.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public function __construct(
/**
* @return TValue
*/
#[\Override]
public function getResult(): mixed
{
return $this->result;
Expand Down
17 changes: 11 additions & 6 deletions src/Context/Internal/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,27 @@
use Amp\ByteStream\StreamChannel;
use Amp\Cancellation;
use Amp\Future;
use Amp\Parallel\Ipc;
use Amp\Parallel\Ipc\IpcHub;
use Amp\Serialization\SerializationException;
use Revolt\EventLoop;

/** @internal */
function runContext(string $uri, string $key, Cancellation $connectCancellation, array $argv): void
/**
* @param class-string<IpcHub> $hubClass
* @param non-empty-string $uri
* @param non-empty-string $key
* @internal
*/
function runContext(string $hubClass, string $uri, string $key, Cancellation $connectCancellation, array $argv): void
{
EventLoop::queue(function () use ($argv, $uri, $key, $connectCancellation): void {
EventLoop::queue(function () use ($hubClass, $argv, $uri, $key, $connectCancellation): void {
/** @noinspection PhpUnusedLocalVariableInspection */
$argc = \count($argv);

try {
$socket = Ipc\connect($uri, $key, $connectCancellation);
$socket = $hubClass::connect($uri, $key, $connectCancellation);
$ipcChannel = new StreamChannel($socket, $socket);

$socket = Ipc\connect($uri, $key, $connectCancellation);
$socket = $hubClass::connect($uri, $key, $connectCancellation);
$resultChannel = new StreamChannel($socket, $socket);
} catch (\Throwable $exception) {
\trigger_error($exception->getMessage(), E_USER_ERROR);
Expand Down
19 changes: 14 additions & 5 deletions src/Context/Internal/process-runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Amp\ByteStream;
use Amp\Parallel\Context\ProcessContext;
use Amp\Parallel\Ipc;
use Amp\Parallel\Ipc\IpcHub;
use Amp\TimeoutCancellation;
use Revolt\EventLoop;

Expand Down Expand Up @@ -61,22 +62,30 @@
\trigger_error("No socket path provided", E_USER_ERROR);
}

if (!isset($argv[2]) || !\is_numeric($argv[2])) {
\trigger_error("No key length provided", E_USER_ERROR);
if (!isset($argv[2])) {
\trigger_error("No hub class provided", E_USER_ERROR);
}

if (!isset($argv[3]) || !\is_numeric($argv[3])) {
\trigger_error("No key length provided", E_USER_ERROR);
}

if (!isset($argv[4]) || !\is_numeric($argv[4])) {
\trigger_error("No timeout provided", E_USER_ERROR);
}

[, $uri, $length, $timeout] = $argv;
[, $uri, $hubClass, $length, $timeout] = $argv;
$length = (int) $length;
$timeout = (int) $timeout;

\assert($length > 0 && $timeout > 0);

if (!isset(\class_implements($hubClass)[IpcHub::class])) {
throw new \Error("Passed hub class $hubClass does not implement IpcHub!");
}

// Remove script path, socket path, key length, and timeout from process arguments.
$argv = \array_slice($argv, 4);
$argv = \array_slice($argv, 5);

try {
$cancellation = new TimeoutCancellation($timeout);
Expand All @@ -85,5 +94,5 @@
\trigger_error($exception->getMessage(), E_USER_ERROR);
}

runContext($uri, $key, $cancellation, $argv);
runContext($hubClass, $uri, $key, $cancellation, $argv);
})();
7 changes: 7 additions & 0 deletions src/Context/ProcessContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
use Amp\Process\Process;
use Amp\Process\ProcessException;

use function Amp\ByteStream\getStderr;
use function Amp\ByteStream\getStdout;

/**
* @api
* @template-covariant TResult
* @template-covariant TReceive
* @template TSend
Expand Down Expand Up @@ -136,6 +140,7 @@ public static function start(
...(self::$options ??= self::buildOptions()),
$scriptPath,
$ipcHub->getUri(),
$ipcHub::class,
(string) \strlen($key),
(string) $childConnectTimeout,
...$script,
Expand Down Expand Up @@ -276,6 +281,7 @@ public function __destruct()
* @return TResult
* @throws ContextException
*/
#[\Override]
public function join(?Cancellation $cancellation = null): mixed
{
$data = $this->receiveExitResult($cancellation);
Expand Down Expand Up @@ -348,6 +354,7 @@ public function getStderr(): ReadableResourceStream
return $this->process->getStderr();
}

#[\Override]
public function close(): void
{
$this->process->kill();
Expand Down
1 change: 1 addition & 0 deletions src/Context/ProcessContextFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public function __construct(
*
* @throws ContextException
*/
#[\Override]
public function start(string|array $script, ?Cancellation $cancellation = null): ProcessContext
{
return ProcessContext::start(
Expand Down
1 change: 1 addition & 0 deletions src/Context/StatusError.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Parallel\Context;

/** @api */
class StatusError extends \Error
{
}
Loading
Loading