Skip to content

Introduce Resque_Job_PID so a process PID can be obtained. #334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions HOWITWORKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,47 +101,50 @@ How do the workers process the queues?
8. `Resque_Job->fail()` returns control to the worker (still in
`Resque_Worker::work()`) without a value
* Job
1. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its
1. `Resque_Job_PID` is created, registering the PID of the actual process
doing the job.
2. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its
only argument.
2. `Resque_Worker->perform()` sets up a `try...catch` block so it can
3. `Resque_Worker->perform()` sets up a `try...catch` block so it can
properly handle exceptions by marking jobs as failed (by calling
`Resque_Job->fail()`, as above)
3. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
4. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
`afterFork` event
4. Still inside the `try...catch`, `Resque_Worker->perform()` calls
5. Still inside the `try...catch`, `Resque_Worker->perform()` calls
`Resque_Job->perform()` with no arguments
5. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
6. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
arguments
6. If `Resque_Job->getInstance()` has already been called, it returns the
7. If `Resque_Job->getInstance()` has already been called, it returns the
existing instance; otherwise:
7. `Resque_Job->getInstance()` checks that the job's class (type) exists
8. `Resque_Job->getInstance()` checks that the job's class (type) exists
and has a `perform()` method; if not, in either case, it throws an
exception which will be caught by `Resque_Worker->perform()`
8. `Resque_Job->getInstance()` creates an instance of the job's class, and
9. `Resque_Job->getInstance()` creates an instance of the job's class, and
initializes it with a reference to the `Resque_Job` itself, the job's
arguments (which it gets by calling `Resque_Job->getArguments()`, which
in turn simply returns the value of `args[0]`, or an empty array if no
arguments were passed), and the queue name
9. `Resque_Job->getInstance()` returns control, along with the job class
10. `Resque_Job->getInstance()` returns control, along with the job class
instance, to `Resque_Job->perform()`
10. `Resque_Job->perform()` sets up its own `try...catch` block to handle
11. `Resque_Job->perform()` sets up its own `try...catch` block to handle
`Resque_Job_DontPerform` exceptions; any other exceptions are passed
up to `Resque_Worker->perform()`
11. `Resque_Job->perform()` triggers a `beforePerform` event
12. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists
13. `Resque_Job->perform()` calls `perform()` on the instance
14. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
12. `Resque_Job->perform()` triggers a `beforePerform` event
13. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists
14. `Resque_Job->perform()` calls `perform()` on the instance
15. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
exists
15. `Resque_Job->perform()` triggers an `afterPerform` event
16. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
16. `Resque_Job->perform()` triggers an `afterPerform` event
17. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
exceptions by returning control, and the value `FALSE`, to
`Resque_Worker->perform()`; any other situation returns the value
`TRUE` along with control, instead
17. The `try...catch` block in `Resque_Worker->perform()` ends
18. `Resque_Worker->perform()` updates the job status from `RUNNING` to
18. The `try...catch` block in `Resque_Worker->perform()` ends
19. `Resque_Worker->perform()` updates the job status from `RUNNING` to
`COMPLETE`, then returns control, with no value, to the worker (again
still in `Resque_Worker::work()`)
19. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
20. `Resque_Job_PID()` is removed, the forked process will terminate soon
21. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
cleanly
* SPECIAL CASE: Non-forking OS (Windows)
1. Same as the job above, except it doesn't call `exit(0)` when done
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,19 @@ or failed, and are then automatically expired. A status can also
forcefully be expired by calling the `stop()` method on a status
class.

### Obtaining job PID ###

You can obtain the PID of the actual process doing the work through `Resque_Job_PID`. On a forking OS this will be the
PID of the forked process.

CAUTION: on a non-forking OS, the PID returned will be of the worker itself.

```php
echo Resque_Job_PID::get($token);
```

Function returns `0` if the `perform` hasn't started yet, or if it has already ended.

## Workers ##

Workers work in the exact same way as the Ruby workers. For complete
Expand Down
43 changes: 43 additions & 0 deletions lib/Resque/Job/PID.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php
/**
* PID tracker for the forked worker job.
*
* @package Resque/Job
* @author Chris Boulton <[email protected]>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job_PID
{
/**
* Create a new PID tracker item for the supplied job ID.
*
* @param string $id The ID of the job to track the PID of.
*/
public static function create($id)
{
Resque::redis()->set('job:' . $id . ':pid', (string)getmypid());
}

/**
* Fetch the PID for the process actually executing the job.
*
* @param string $id The ID of the job to get the PID of.
*
* @return int PID of the process doing the job (on non-forking OS, PID of the worker, otherwise forked PID).
*/
public static function get($id)
{
return (int)Resque::redis()->get('job:' . $id . ':pid');
}

/**
* Remove the PID tracker for the job.
*
* @param string $id The ID of the job to remove the tracker from.
*/
public static function del($id)
{
Resque::redis()->del('job:' . $id . ':pid');
}
}

17 changes: 17 additions & 0 deletions lib/Resque/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,17 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->logger->log(Psr\Log\LogLevel::INFO, $status);

if(!empty($job->payload['id'])) {
Resque_Job_PID::create($job->payload['id']);
}

$this->perform($job);

if(!empty($job->payload['id'])) {
Resque_Job_PID::del($job->payload['id']);
}

if ($this->child === 0) {
exit(0);
}
Expand Down Expand Up @@ -394,6 +404,13 @@ public function shutdownNow()
$this->killChild();
}

/**
* @return int Child process PID.
*/
public function getChildPID() {
return $this->child;
}

/**
* Kill a forked child job immediately. The job it is processing will not
* be completed.
Expand Down
47 changes: 47 additions & 0 deletions test/Resque/Tests/JobPIDTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php
/**
* Resque_Job_PID tests.
*
* @package Resque/Tests
* @author Chris Boulton <[email protected]>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Tests_JobPIDTest extends Resque_Tests_TestCase
{
/**
* @var \Resque_Worker
*/
protected $worker;

public function setUp()
{
parent::setUp();

// Register a worker to test with
$this->worker = new Resque_Worker('jobs');
$this->worker->setLogger(new Resque_Log());
}

public function testQueuedJobDoesNotReturnPID()
{
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
$this->assertEquals(0, Resque_Job_PID::get($token));
}

public function testRunningJobReturnsPID()
{
// Cannot use InProgress_Job on non-forking OS.
if(!function_exists('pcntl_fork')) return;

$token = Resque::enqueue('jobs', 'InProgress_Job', null, true);
$this->worker->work(0);
$this->assertNotEquals(0, Resque_Job_PID::get($token));
}

public function testFinishedJobDoesNotReturnPID()
{
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
$this->worker->work(0);
$this->assertEquals(0, Resque_Job_PID::get($token));
}
}
8 changes: 4 additions & 4 deletions test/Resque/Tests/JobStatusTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
*/
class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase
{
/**
* @var \Resque_Worker
*/
protected $worker;
/**
* @var \Resque_Worker
*/
protected $worker;

public function setUp()
{
Expand Down
18 changes: 18 additions & 0 deletions test/bootstrap.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,24 @@ public function perform()
}
}

/**
* This job exits the forked worker process, which simulates the job being (forever) in progress,
* so that we can verify the state of the system for "running jobs". Does not work on a non-forking OS.
*
* CAUTION Use this test job only with Worker::work, i.e. only when you actually trigger the fork in tests.
*/
class InProgress_Job
{
public function perform()
{
if(!function_exists('pcntl_fork')) {
// We can't lose the worker on a non-forking OS.
throw new Failing_Job_Exception('Do not use InProgress_Job for tests on non-forking OS!');
}
exit(0);
}
}

class Test_Job_Without_Perform_Method
{

Expand Down