Skip to content

Commit 814d976

Browse files
committed
Introduce Resque_Job_PID so a process PID can be obtained.
1 parent cf187fa commit 814d976

File tree

7 files changed

+162
-21
lines changed

7 files changed

+162
-21
lines changed

HOWITWORKS.md

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -101,47 +101,50 @@ How do the workers process the queues?
101101
8. `Resque_Job->fail()` returns control to the worker (still in
102102
`Resque_Worker::work()`) without a value
103103
* Job
104-
1. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its
104+
1. `Resque_Job_PID` is created, registering the PID of the actual process
105+
doing the job.
106+
2. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its
105107
only argument.
106-
2. `Resque_Worker->perform()` sets up a `try...catch` block so it can
108+
3. `Resque_Worker->perform()` sets up a `try...catch` block so it can
107109
properly handle exceptions by marking jobs as failed (by calling
108110
`Resque_Job->fail()`, as above)
109-
3. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
111+
4. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
110112
`afterFork` event
111-
4. Still inside the `try...catch`, `Resque_Worker->perform()` calls
113+
5. Still inside the `try...catch`, `Resque_Worker->perform()` calls
112114
`Resque_Job->perform()` with no arguments
113-
5. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
115+
6. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
114116
arguments
115-
6. If `Resque_Job->getInstance()` has already been called, it returns the
117+
7. If `Resque_Job->getInstance()` has already been called, it returns the
116118
existing instance; otherwise:
117-
7. `Resque_Job->getInstance()` checks that the job's class (type) exists
119+
8. `Resque_Job->getInstance()` checks that the job's class (type) exists
118120
and has a `perform()` method; if not, in either case, it throws an
119121
exception which will be caught by `Resque_Worker->perform()`
120-
8. `Resque_Job->getInstance()` creates an instance of the job's class, and
122+
9. `Resque_Job->getInstance()` creates an instance of the job's class, and
121123
initializes it with a reference to the `Resque_Job` itself, the job's
122124
arguments (which it gets by calling `Resque_Job->getArguments()`, which
123125
in turn simply returns the value of `args[0]`, or an empty array if no
124126
arguments were passed), and the queue name
125-
9. `Resque_Job->getInstance()` returns control, along with the job class
127+
10. `Resque_Job->getInstance()` returns control, along with the job class
126128
instance, to `Resque_Job->perform()`
127-
10. `Resque_Job->perform()` sets up its own `try...catch` block to handle
129+
11. `Resque_Job->perform()` sets up its own `try...catch` block to handle
128130
`Resque_Job_DontPerform` exceptions; any other exceptions are passed
129131
up to `Resque_Worker->perform()`
130-
11. `Resque_Job->perform()` triggers a `beforePerform` event
131-
12. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists
132-
13. `Resque_Job->perform()` calls `perform()` on the instance
133-
14. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
132+
12. `Resque_Job->perform()` triggers a `beforePerform` event
133+
13. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists
134+
14. `Resque_Job->perform()` calls `perform()` on the instance
135+
15. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
134136
exists
135-
15. `Resque_Job->perform()` triggers an `afterPerform` event
136-
16. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
137+
16. `Resque_Job->perform()` triggers an `afterPerform` event
138+
17. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
137139
exceptions by returning control, and the value `FALSE`, to
138140
`Resque_Worker->perform()`; any other situation returns the value
139141
`TRUE` along with control, instead
140-
17. The `try...catch` block in `Resque_Worker->perform()` ends
141-
18. `Resque_Worker->perform()` updates the job status from `RUNNING` to
142+
18. The `try...catch` block in `Resque_Worker->perform()` ends
143+
19. `Resque_Worker->perform()` updates the job status from `RUNNING` to
142144
`COMPLETE`, then returns control, with no value, to the worker (again
143145
still in `Resque_Worker::work()`)
144-
19. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
146+
20. `Resque_Job_PID()` is removed, the forked process will terminate soon
147+
21. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
145148
cleanly
146149
* SPECIAL CASE: Non-forking OS (Windows)
147150
1. Same as the job above, except it doesn't call `exit(0)` when done
@@ -154,4 +157,4 @@ How do the workers process the queues?
154157
`Resque_Worker::work()`
155158
4. `Resque_Worker::work()` returns control to the beginning of the main loop,
156159
where it will wait for the next job to become available, and start this
157-
process all over again
160+
process all over again

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,19 @@ or failed, and are then automatically expired. A status can also
193193
forcefully be expired by calling the `stop()` method on a status
194194
class.
195195

196+
### Obtaining job PID ###
197+
198+
You can obtain the PID of the actual process doing the work through `Resque_Job_PID`. On a forking OS this will be the
199+
PID of the forked process.
200+
201+
CAUTION: on a non-forking OS, the PID returned will be of the worker itself.
202+
203+
```php
204+
echo Resque_Job_PID::get($token);
205+
```
206+
207+
Function returns `0` if the `perform` hasn't started yet, or if it has already ended.
208+
196209
## Workers ##
197210

198211
Workers work in the exact same way as the Ruby workers. For complete

lib/Resque.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ public static function size($queue)
219219
public static function enqueue($queue, $class, $args = null, $trackStatus = false)
220220
{
221221
$id = Resque::generateJobId();
222+
var_dump('enq:' . $id);
222223
$hookParams = array(
223224
'class' => $class,
224225
'args' => $args,
@@ -235,6 +236,7 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals
235236
Resque_Job::create($queue, $class, $args, $trackStatus, $id);
236237
Resque_Event::trigger('afterEnqueue', $hookParams);
237238

239+
var_dump('enq ret:' . $id);
238240
return $id;
239241
}
240242

lib/Resque/Job/PID.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
/**
3+
* PID tracker for the forked worker job.
4+
*
5+
* @package Resque/Job
6+
* @author Chris Boulton <[email protected]>
7+
* @license http://www.opensource.org/licenses/mit-license.php
8+
*/
9+
class Resque_Job_PID
10+
{
11+
/**
12+
* Create a new PID tracker item for the supplied job ID.
13+
*
14+
* @param string $id The ID of the job to track the PID of.
15+
*/
16+
public static function create($id)
17+
{
18+
Resque::redis()->set('job:' . $id . ':pid', (string)getmypid());
19+
}
20+
21+
/**
22+
* Fetch the PID for the process actually executing the job.
23+
*
24+
* @param string $id The ID of the job to get the PID of.
25+
*
26+
* @return int PID of the process doing the job (on non-forking OS, PID of the worker, otherwise forked PID).
27+
*/
28+
public static function get($id)
29+
{
30+
return (int)Resque::redis()->get('job:' . $id . ':pid');
31+
}
32+
33+
/**
34+
* Remove the PID tracker for the job.
35+
*/
36+
public static function del($id)
37+
{
38+
Resque::redis()->del('job:' . $id . ':pid');
39+
}
40+
}
41+

lib/Resque/Worker.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,17 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
199199
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
200200
$this->updateProcLine($status);
201201
$this->logger->log(Psr\Log\LogLevel::INFO, $status);
202+
203+
if(!empty($job->payload['id'])) {
204+
Resque_Job_PID::create($job->payload['id']);
205+
}
206+
202207
$this->perform($job);
208+
209+
if(!empty($job->payload['id'])) {
210+
Resque_Job_PID::del($job->payload['id']);
211+
}
212+
203213
if ($this->child === 0) {
204214
exit(0);
205215
}
@@ -394,6 +404,13 @@ public function shutdownNow()
394404
$this->killChild();
395405
}
396406

407+
/**
408+
* @return int Child process PID.
409+
*/
410+
public function getChildPID() {
411+
return $this->child;
412+
}
413+
397414
/**
398415
* Kill a forked child job immediately. The job it is processing will not
399416
* be completed.

test/Resque/Tests/JobPIDTest.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
/**
3+
* Resque_Job_PID tests.
4+
*
5+
* @package Resque/Tests
6+
* @author Chris Boulton <[email protected]>
7+
* @license http://www.opensource.org/licenses/mit-license.php
8+
*/
9+
class Resque_Tests_JobPIDTest extends Resque_Tests_TestCase
10+
{
11+
/**
12+
* @var \Resque_Worker
13+
*/
14+
protected $worker;
15+
16+
public function setUp()
17+
{
18+
parent::setUp();
19+
20+
// Register a worker to test with
21+
$this->worker = new Resque_Worker('jobs');
22+
$this->worker->setLogger(new Resque_Log());
23+
}
24+
25+
public function testQueuedJobDoesNotReturnPID()
26+
{
27+
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
28+
$this->assertEquals(0, Resque_Job_PID::get($token));
29+
}
30+
31+
public function testRunningJobReturnsPID()
32+
{
33+
// Cannot use InProgress_Job on non-forking OS.
34+
if(!function_exists('pcntl_fork')) return;
35+
36+
$token = Resque::enqueue('jobs', 'InProgress_Job', null, true);
37+
$this->worker->work(0);
38+
$this->assertNotEquals(0, Resque_Job_PID::get($token));
39+
}
40+
41+
public function testFinishedJobDoesNotReturnPID()
42+
{
43+
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
44+
$this->worker->work(0);
45+
$this->assertEquals(0, Resque_Job_PID::get($token));
46+
}
47+
}

test/bootstrap.php

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,24 @@ public function perform()
109109
}
110110
}
111111

112+
/**
113+
* This job exits the forked worker process, which simulates the job being (forever) in progress,
114+
* so that we can verify the state of the system for "running jobs". Does not work on a non-forking OS.
115+
*
116+
* CAUTION Use this test job only with Worker::work, i.e. only when you actually trigger the fork in tests.
117+
*/
118+
class InProgress_Job
119+
{
120+
public function perform()
121+
{
122+
if(!function_exists('pcntl_fork')) {
123+
// We can't lose the worker on a non-forking OS.
124+
throw new Failing_Job_Exception('Do not use InProgress_Job for tests on non-forking OS!');
125+
}
126+
exit(0);
127+
}
128+
}
129+
112130
class Test_Job_Without_Perform_Method
113131
{
114132

@@ -145,4 +163,4 @@ public function tearDown()
145163
{
146164
self::$called = true;
147165
}
148-
}
166+
}

0 commit comments

Comments
 (0)