Skip to content

Commit 37850ff

Browse files
committed
Fix assigning workers to waiting tasks
Fixes #177.
1 parent f91c44b commit 37850ff

File tree

3 files changed

+70
-33
lines changed

3 files changed

+70
-33
lines changed

src/Worker/ContextWorkerPool.php

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ final class ContextWorkerPool implements WorkerPool
3535
private readonly \SplQueue $idleWorkers;
3636

3737
/** @var \SplQueue<DeferredFuture<Worker|null>> Task submissions awaiting an available worker. */
38-
private \SplQueue $waiting;
38+
private readonly \SplQueue $waiting;
3939

4040
/** @var \Closure(Worker):void */
4141
private readonly \Closure $push;
@@ -58,33 +58,26 @@ public function __construct(
5858
private readonly int $limit = self::DEFAULT_WORKER_LIMIT,
5959
private readonly ?WorkerFactory $factory = null,
6060
) {
61-
if ($limit < 0) {
62-
throw new \Error("Maximum size must be a non-negative integer");
61+
if ($limit <= 0) {
62+
throw new \ValueError("Maximum size must be a positive integer");
6363
}
6464

65-
$this->workers = new \SplObjectStorage();
66-
$this->idleWorkers = new \SplQueue();
67-
$this->waiting = new \SplQueue();
65+
$this->workers = $workers = new \SplObjectStorage();
66+
$this->idleWorkers = $idleWorkers = new \SplQueue();
67+
$this->waiting = $waiting = new \SplQueue();
6868

6969
$this->deferredCancellation = new DeferredCancellation();
7070

71-
$workers = $this->workers;
72-
$idleWorkers = $this->idleWorkers;
73-
$waiting = $this->waiting;
74-
71+
/** @var \SplObjectStorage<Worker, int> $workers Needed for Psalm. */
7572
$this->push = static function (Worker $worker) use ($waiting, $workers, $idleWorkers): void {
7673
if (!$workers->contains($worker)) {
74+
// Pool was shutdown, do not re-insert worker into collection.
7775
return;
7876
}
7977

80-
if ($worker->isRunning()) {
78+
if ($waiting->isEmpty()) {
8179
$idleWorkers->push($worker);
8280
} else {
83-
$workers->detach($worker);
84-
$worker = null;
85-
}
86-
87-
if (!$waiting->isEmpty()) {
8881
$waiting->dequeue()->complete($worker);
8982
}
9083
};

test/Worker/AbstractPoolTest.php

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
use Amp\Future;
66
use Amp\Parallel\Context\StatusError;
7+
use Amp\Parallel\Test\Worker\Fixtures\TestTask;
78
use Amp\Parallel\Worker\ContextWorkerFactory;
89
use Amp\Parallel\Worker\ContextWorkerPool;
10+
use Amp\Parallel\Worker\Execution;
911
use Amp\Parallel\Worker\Task;
1012
use Amp\Parallel\Worker\Worker;
1113
use Amp\Parallel\Worker\WorkerPool;
@@ -81,25 +83,25 @@ public function testBusyPool(): void
8183
return new Fixtures\TestTask($value);
8284
}, $values);
8385

84-
$promises = \array_map(function (Task $task) use ($pool): Future {
86+
$futures = \array_map(function (Task $task) use ($pool): Future {
8587
return $pool->submit($task)->getFuture();
8688
}, $tasks);
8789

88-
self::assertEquals($values, Future\await($promises));
90+
self::assertEquals($values, Future\await($futures));
8991

90-
$promises = \array_map(function (Task $task) use ($pool): Future {
92+
$futures = \array_map(function (Task $task) use ($pool): Future {
9193
return $pool->submit($task)->getFuture();
9294
}, $tasks);
9395

94-
self::assertEquals($values, Future\await($promises));
96+
self::assertEquals($values, Future\await($futures));
9597

9698
$pool->shutdown();
9799
}
98100

99101
public function testCreatePoolShouldThrowError(): void
100102
{
101103
$this->expectException(\Error::class);
102-
$this->expectExceptionMessage('Maximum size must be a non-negative integer');
104+
$this->expectExceptionMessage('Maximum size must be a positive integer');
103105

104106
$this->createPool(-1);
105107
}
@@ -112,28 +114,60 @@ public function testCleanGarbageCollection(): void
112114

113115
$values = \range(1, 50);
114116

115-
$promises = \array_map(static function (int $value) use ($pool): Future {
117+
$futures = \array_map(static function (int $value) use ($pool): Future {
116118
return $pool->submit(new Fixtures\TestTask($value))->getFuture();
117119
}, $values);
118120

119-
self::assertEquals($values, Future\await($promises));
121+
self::assertEquals($values, Future\await($futures));
120122
}
121123
}
122124

125+
/**
126+
* @see https://github.com/amphp/parallel/issues/66
127+
*/
123128
public function testPooledKill(): void
124129
{
125130
$this->setTimeout(10);
131+
\set_error_handler(static function (int $errno, string $errstr) use (&$error): void {
132+
$error = $errstr;
133+
});
134+
135+
try {
136+
$pool = $this->createPool(1);
137+
$worker1 = $pool->getWorker();
138+
$worker1->kill();
139+
self::assertFalse($worker1->isRunning());
140+
141+
unset($worker1); // Destroying the worker will trigger the pool to recognize it has been killed.
142+
143+
$worker2 = $pool->getWorker();
144+
self::assertTrue($worker2->isRunning());
145+
146+
self::assertStringContainsString('Worker in pool crashed', $error);
147+
} finally {
148+
\restore_error_handler();
149+
}
150+
}
151+
152+
/**
153+
* @see https://github.com/amphp/parallel/issues/177
154+
*/
155+
public function testWaitingForAvailableWorker(): void
156+
{
157+
$count = 4;
158+
$delay = 0.1;
159+
160+
$this->setMinimumRuntime($delay * $count);
161+
$this->setTimeout($delay * $count + $delay);
126162

127-
// See https://github.com/amphp/parallel/issues/66
128163
$pool = $this->createPool(1);
129-
$worker1 = $pool->getWorker();
130-
$worker1->kill();
131-
self::assertFalse($worker1->isRunning());
132164

133-
unset($worker1); // Destroying the worker will trigger the pool to recognize it has been killed.
165+
$executions = [];
166+
for ($i = 0; $i < $count; $i++) {
167+
$executions[] = $pool->submit(new TestTask($i, $delay));
168+
}
134169

135-
$worker2 = $pool->getWorker();
136-
self::assertTrue($worker2->isRunning());
170+
Future\await(\array_map(fn (Execution $e) => $e->getFuture(), $executions));
137171
}
138172

139173
protected function createWorker(?string $autoloadPath = null): Worker

test/Worker/DefaultPoolTest.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,20 @@ public function testCrashedWorker(): void
5353
return $worker;
5454
});
5555

56-
$pool = new ContextWorkerPool(32, $factory);
56+
\set_error_handler(static function (int $errno, string $errstr) use (&$error): void {
57+
$error = $errstr;
58+
});
59+
60+
try {
61+
$pool = new ContextWorkerPool(32, $factory);
62+
63+
$pool->submit($this->createMock(Task::class))->await();
5764

58-
$pool->submit($this->createMock(Task::class))->await();
65+
$pool->submit($this->createMock(Task::class))->await();
5966

60-
$pool->submit($this->createMock(Task::class))->await();
67+
self::assertStringContainsString('Worker in pool crashed', $error);
68+
} finally {
69+
\restore_error_handler();
70+
}
6171
}
6272
}

0 commit comments

Comments
 (0)