Skip to content

Commit 572f173

Browse files
committed
Refactor Resque_Worker to use reservers.
Resque_Worker now takes an instance of ReserverInterface as a dependency. The body of the Resque_Worker::reserve() method has been replaced with a call to ReserverInterface::reserve(). Blocking and non-blocking reservation behaviour is now implemented in BlockingListPopReserver and QueueOrderReserver respectively. Logic that decides whether the worker should sleep after an attempt to reserve a job has been replaced with a call to ReserverInterface::waitAfterReservationAttempt().
1 parent c095853 commit 572f173

File tree

6 files changed

+131
-110
lines changed

6 files changed

+131
-110
lines changed

lib/Resque/Worker.php

Lines changed: 47 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
<?php
22
declare(ticks = 1);
33

4+
use Resque\Reserver\ReserverInterface;
5+
use Resque\Reserver\ReserverFactory;
6+
47
/**
58
* Resque worker that handles checking queues for jobs, fetching them
69
* off the queues, running them and handling the result.
@@ -12,10 +15,20 @@
1215
class Resque_Worker
1316
{
1417
/**
15-
* @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
16-
*/
18+
* @var ReserverFactory
19+
*/
20+
private static $reserverFactory;
21+
22+
/**
23+
* @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
24+
*/
1725
public $logger;
1826

27+
/**
28+
* @var ReserverInterface The reserver used to reserve jobs from the queues.
29+
*/
30+
private $reserver;
31+
1932
/**
2033
* @var array Array of all associated queues for this worker.
2134
*/
@@ -60,11 +73,13 @@ class Resque_Worker
6073
* order. You can easily add new queues dynamically and have them worked on using
6174
* this method.
6275
*
76+
* @param ReserverInterface $reserver The reserver to use to reserve jobs from the queues.
6377
* @param string|array $queues String with a single queue name, array with multiple.
6478
*/
65-
public function __construct($queues)
79+
public function __construct(ReserverInterface $reserver, $queues)
6680
{
67-
$this->logger = new Resque_Log();
81+
$this->reserver = $reserver;
82+
$this->logger = new Resque_Log();
6883

6984
if(!is_array($queues)) {
7085
$queues = array($queues);
@@ -76,6 +91,16 @@ public function __construct($queues)
7691
$this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues);
7792
}
7893

94+
/**
95+
* Sets the reserver factory instance. Used by the find() method to create worker instances.
96+
*
97+
* @param ReserverFactory $reserverFactory
98+
*/
99+
public static function setReserverFactory(ReserverFactory $reserverFactory)
100+
{
101+
self::$reserverFactory = $reserverFactory;
102+
}
103+
79104
/**
80105
* Return all workers known to Resque as instantiated instances.
81106
* @return array
@@ -119,7 +144,9 @@ public static function find($workerId)
119144

120145
list($hostname, $pid, $queues) = explode(':', $workerId, 3);
121146
$queues = explode(',', $queues);
122-
$worker = new self($queues);
147+
148+
$reserver = self::$reserverFactory->createDefaultReserver($queues);
149+
$worker = new self($reserver, $queues);
123150
$worker->setId($workerId);
124151
return $worker;
125152
}
@@ -142,7 +169,7 @@ public function setId($workerId)
142169
*
143170
* @param int $interval How often to check for new jobs across the queues.
144171
*/
145-
public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
172+
public function work($interval = Resque::DEFAULT_INTERVAL)
146173
{
147174
$this->updateProcLine('Starting');
148175
$this->startup();
@@ -154,36 +181,25 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
154181

155182
// Attempt to find and reserve a job
156183
$job = false;
157-
if(!$this->paused) {
158-
if($blocking === true) {
159-
$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
160-
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval);
161-
} else {
162-
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
163-
}
184+
if (!$this->paused) {
185+
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
164186

165-
$job = $this->reserve($blocking, $interval);
187+
$job = $this->reserve();
188+
} else {
189+
$this->updateProcLine('Paused');
166190
}
167191

168-
if(!$job) {
192+
if (!$job) {
169193
// For an interval of 0, break now - helps with unit testing etc
170-
if($interval == 0) {
194+
if ($interval == 0) {
171195
break;
172196
}
173197

174-
if($blocking === false)
175-
{
176-
// If no job was found, we sleep for $interval before continuing and checking again
198+
// If no job was found, we sleep for $interval before continuing and checking again
199+
if ($this->reserver->waitAfterReservationAttempt()) {
177200
$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
178-
if($this->paused) {
179-
$this->updateProcLine('Paused');
180-
}
181-
else {
182-
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
183-
}
184-
185-
usleep($interval * 1000000);
186-
}
201+
usleep($interval * 1000000);
202+
}
187203

188204
continue;
189205
}
@@ -252,33 +268,11 @@ public function perform(Resque_Job $job)
252268
/**
253269
* @param bool $blocking
254270
* @param int $timeout
255-
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
271+
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
256272
*/
257-
public function reserve($blocking = false, $timeout = null)
273+
public function reserve()
258274
{
259-
$queues = $this->queues();
260-
if(!is_array($queues)) {
261-
return;
262-
}
263-
264-
if($blocking === true) {
265-
$job = Resque_Job::reserveBlocking($queues, $timeout);
266-
if($job) {
267-
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
268-
return $job;
269-
}
270-
} else {
271-
foreach($queues as $queue) {
272-
$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
273-
$job = Resque_Job::reserve($queue);
274-
if($job) {
275-
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
276-
return $job;
277-
}
278-
}
279-
}
280-
281-
return false;
275+
return $this->reserver->reserve() ?: false;
282276
}
283277

284278
/**

test/Resque/Tests/EventTest.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
<?php
2+
3+
use Resque\Reserver\ReserverFactory;
4+
25
/**
36
* Resque_Event tests.
47
*
@@ -14,9 +17,13 @@ public function setUp()
1417
{
1518
Test_Job::$called = false;
1619

20+
$logger = new Resque_Log();
21+
$reserverFactory = new ReserverFactory($logger);
22+
$reserver = $reserverFactory->createDefaultReserver(array('jobs'));
23+
1724
// Register a worker to test with
18-
$this->worker = new Resque_Worker('jobs');
19-
$this->worker->setLogger(new Resque_Log());
25+
$this->worker = new Resque_Worker($reserver, 'jobs');
26+
$this->worker->setLogger($logger);
2027
$this->worker->registerWorker();
2128
}
2229

test/Resque/Tests/JobStatusTest.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
<?php
2+
3+
use Resque\Reserver\ReserverFactory;
4+
25
/**
36
* Resque_Job_Status tests.
47
*
@@ -17,9 +20,13 @@ public function setUp()
1720
{
1821
parent::setUp();
1922

23+
$logger = new Resque_Log();
24+
$reserverFactory = new ReserverFactory($logger);
25+
$reserver = $reserverFactory->createDefaultReserver(array('jobs'));
26+
2027
// Register a worker to test with
21-
$this->worker = new Resque_Worker('jobs');
22-
$this->worker->setLogger(new Resque_Log());
28+
$this->worker = new Resque_Worker($reserver, 'jobs');
29+
$this->worker->setLogger($logger);
2330
}
2431

2532
public function testJobStatusCanBeTracked()
@@ -103,4 +110,4 @@ public function testRecreatedJobWithTrackingStillTracksStatus()
103110
$newJob = Resque_Job::reserve('jobs');
104111
$this->assertEquals(Resque_Job_Status::STATUS_WAITING, $newJob->getStatus());
105112
}
106-
}
113+
}

test/Resque/Tests/JobTest.php

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?php
22

3+
use Resque\Reserver\ReserverFactory;
4+
35
/**
46
* Resque_Job tests.
57
*
@@ -15,9 +17,13 @@ public function setUp()
1517
{
1618
parent::setUp();
1719

20+
$logger = new Resque_Log();
21+
$reserverFactory = new ReserverFactory($logger);
22+
$reserver = $reserverFactory->createDefaultReserver(array('jobs'));
23+
1824
// Register a worker to test with
19-
$this->worker = new Resque_Worker('jobs');
20-
$this->worker->setLogger(new Resque_Log());
25+
$this->worker = new Resque_Worker($reserver, 'jobs');
26+
$this->worker->setLogger($logger);
2127
$this->worker->registerWorker();
2228
}
2329

@@ -153,7 +159,7 @@ public function testInvalidJobThrowsException()
153159
$job->worker = $this->worker;
154160
$job->perform();
155161
}
156-
162+
157163
public function testJobWithSetUpCallbackFiresSetUp()
158164
{
159165
$payload = array(
@@ -165,10 +171,10 @@ public function testJobWithSetUpCallbackFiresSetUp()
165171
);
166172
$job = new Resque_Job('jobs', $payload);
167173
$job->perform();
168-
174+
169175
$this->assertTrue(Test_Job_With_SetUp::$called);
170176
}
171-
177+
172178
public function testJobWithTearDownCallbackFiresTearDown()
173179
{
174180
$payload = array(
@@ -180,7 +186,7 @@ public function testJobWithTearDownCallbackFiresTearDown()
180186
);
181187
$job = new Resque_Job('jobs', $payload);
182188
$job->perform();
183-
189+
184190
$this->assertTrue(Test_Job_With_TearDown::$called);
185191
}
186192

@@ -329,7 +335,7 @@ public function testDequeueItemWithArg()
329335
$this->assertEquals(Resque::dequeue($queue, $test), 1);
330336
#$this->assertEquals(Resque::size($queue), 1);
331337
}
332-
338+
333339
public function testDequeueSeveralItemsWithArgs()
334340
{
335341
// GIVEN
@@ -340,11 +346,11 @@ public function testDequeueSeveralItemsWithArgs()
340346
Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs);
341347
Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs);
342348
$this->assertEquals(Resque::size($queue), 3);
343-
349+
344350
// WHEN
345351
$test = array('Test_Job_Dequeue9' => $removeArgs);
346352
$removedItems = Resque::dequeue($queue, $test);
347-
353+
348354
// THEN
349355
$this->assertEquals($removedItems, 2);
350356
$this->assertEquals(Resque::size($queue), 1);

0 commit comments

Comments
 (0)