Skip to content

Commit f14e844

Browse files
committed
Merge pull request chrisboulton#211 from wedy/dequeue
ability to dequeue jobs of specific queue
2 parents e393d56 + 29e3778 commit f14e844

File tree

4 files changed

+326
-36
lines changed

4 files changed

+326
-36
lines changed

README.md

+25
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,31 @@ class My_Job
134134
}
135135
```
136136

137+
### Dequeueing Jobs ###
138+
139+
This method can be used to conveniently remove a job from a queue.
140+
141+
```php
142+
// Removes job class 'My_Job' of queue 'default'
143+
Resque::dequeue('default', ['My_Job']);
144+
145+
// Removes job class 'My_Job' with Job ID '087df5819a790ac666c9608e2234b21e' of queue 'default'
146+
Resuque::dequeue('default', ['My_Job' => '087df5819a790ac666c9608e2234b21e']);
147+
148+
// Removes job class 'My_Job' with arguments of queue 'default'
149+
Resque::dequeue('default', ['My_Job' => array('foo' => 1, 'bar' => 2)]);
150+
151+
// Removes multiple jobs
152+
Resque::dequeue('default', ['My_Job', 'My_Job2']);
153+
```
154+
155+
If no jobs are given, this method will dequeue all jobs matching the provided queue.
156+
157+
```php
158+
// Removes all jobs of queue 'default'
159+
Resque::dequeue('default');
160+
```
161+
137162
### Tracking Job Statuses ###
138163

139164
php-resque has the ability to perform basic status tracking of a queued

lib/Resque.php

+153-33
Original file line numberDiff line numberDiff line change
@@ -120,39 +120,55 @@ public static function pop($queue)
120120
return json_decode($item, true);
121121
}
122122

123-
/**
124-
* Pop an item off the end of the specified queues, using blocking list pop,
125-
* decode it and return it.
126-
*
127-
* @param array $queues
128-
* @param int $timeout
129-
* @return null|array Decoded item from the queue.
130-
*/
131-
public static function blpop(array $queues, $timeout)
132-
{
133-
$list = array();
134-
foreach($queues AS $queue) {
135-
$list[] = 'queue:' . $queue;
136-
}
137-
138-
$item = self::redis()->blpop($list, (int)$timeout);
139-
140-
if(!$item) {
141-
return;
142-
}
143-
144-
/**
145-
* Normally the Resque_Redis class returns queue names without the prefix
146-
* But the blpop is a bit different. It returns the name as prefix:queue:name
147-
* So we need to strip off the prefix:queue: part
148-
*/
149-
$queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:'));
150-
151-
return array(
152-
'queue' => $queue,
153-
'payload' => json_decode($item[1], true)
154-
);
155-
}
123+
/**
124+
* Remove items of the specified queue
125+
*
126+
* @param string $queue The name of the queue to fetch an item from.
127+
* @param array $items
128+
* @return integer number of deleted items
129+
*/
130+
public static function dequeue($queue, $items = Array())
131+
{
132+
if(count($items) > 0) {
133+
return self::removeItems($queue, $items);
134+
} else {
135+
return self::removeList($queue);
136+
}
137+
}
138+
139+
/**
140+
* Pop an item off the end of the specified queues, using blocking list pop,
141+
* decode it and return it.
142+
*
143+
* @param array $queues
144+
* @param int $timeout
145+
* @return null|array Decoded item from the queue.
146+
*/
147+
public static function blpop(array $queues, $timeout)
148+
{
149+
$list = array();
150+
foreach($queues AS $queue) {
151+
$list[] = 'queue:' . $queue;
152+
}
153+
154+
$item = self::redis()->blpop($list, (int)$timeout);
155+
156+
if(!$item) {
157+
return;
158+
}
159+
160+
/**
161+
* Normally the Resque_Redis class returns queue names without the prefix
162+
* But the blpop is a bit different. It returns the name as prefix:queue:name
163+
* So we need to strip off the prefix:queue: part
164+
*/
165+
$queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:'));
166+
167+
return array(
168+
'queue' => $queue,
169+
'payload' => json_decode($item[1], true)
170+
);
171+
}
156172

157173
/**
158174
* Return the size (number of pending jobs) of the specified queue.
@@ -215,4 +231,108 @@ public static function queues()
215231
}
216232
return $queues;
217233
}
234+
235+
/**
236+
* Remove Items from the queue
237+
* Safely moving each item to a temporary queue before processing it
238+
* If the Job matches, counts otherwise puts it in a requeue_queue
239+
* which at the end eventually be copied back into the original queue
240+
*
241+
* @private
242+
*
243+
* @param string $queue The name of the queue
244+
* @param array $items
245+
* @return integer number of deleted items
246+
*/
247+
private static function removeItems($queue, $items = Array())
248+
{
249+
$counter = 0;
250+
$originalQueue = 'queue:'. $queue;
251+
$tempQueue = $originalQueue. ':temp:'. time();
252+
$requeueQueue = $tempQueue. ':requeue';
253+
254+
// move each item from original queue to temp queue and process it
255+
$finished = false;
256+
while(!$finished) {
257+
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
258+
259+
if(!empty($string)) {
260+
if(self::matchItem($string, $items)) {
261+
$counter++;
262+
} else {
263+
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
264+
}
265+
} else {
266+
$finished = true;
267+
}
268+
}
269+
270+
// move back from temp queue to original queue
271+
$finished = false;
272+
while(!$finished) {
273+
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
274+
if (empty($string)) {
275+
$finished = true;
276+
}
277+
}
278+
279+
// remove temp queue and requeue queue
280+
self::redis()->del($requeueQueue);
281+
self::redis()->del($tempQueue);
282+
283+
return $counter;
284+
}
285+
286+
/**
287+
* matching item
288+
* item can be ['class'] or ['class' => 'id'] or ['class' => {:foo => 1, :bar => 2}]
289+
* @private
290+
*
291+
* @params string $string redis result in json
292+
* @params $items
293+
*
294+
* @return (bool)
295+
*/
296+
private static function matchItem($string, $items)
297+
{
298+
$decoded = json_decode($string, true);
299+
300+
foreach($items as $key => $val) {
301+
# class name only ex: item[0] = ['class']
302+
if (is_numeric($key)) {
303+
if($decoded['class'] == $val) {
304+
return true;
305+
}
306+
# class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}]
307+
} elseif (is_array($val)) {
308+
$decodedArgs = (array)$decoded['args'][0];
309+
if ($decoded['class'] == $key &&
310+
count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) {
311+
return true;
312+
}
313+
# class name with ID, example: item[0] = ['class' => 'id']
314+
} else {
315+
if ($decoded['class'] == $key && $decoded['id'] == $val) {
316+
return true;
317+
}
318+
}
319+
}
320+
return false;
321+
}
322+
323+
/**
324+
* Remove List
325+
*
326+
* @private
327+
*
328+
* @params string $queue the name of the queue
329+
* @return integer number of deleted items belongs to this list
330+
*/
331+
private static function removeList($queue)
332+
{
333+
$counter = self::size($queue);
334+
$result = self::redis()->del('queue:' . $queue);
335+
return ($result == 1) ? $counter : 0;
336+
}
218337
}
338+

lib/Resque/Redis.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ class Resque_Redis
7777
'zscore',
7878
'zremrangebyscore',
7979
'sort',
80-
'rename'
80+
'rename',
81+
'rpoplpush'
8182
);
8283
// sinterstore
8384
// sunion
@@ -86,7 +87,6 @@ class Resque_Redis
8687
// sdiffstore
8788
// sinter
8889
// smove
89-
// rpoplpush
9090
// mget
9191
// msetnx
9292
// mset

test/Resque/Tests/JobTest.php

+146-1
Original file line numberDiff line numberDiff line change
@@ -180,4 +180,149 @@ public function testJobWithNamespace()
180180
Resque_Redis::prefix('resque');
181181
$this->assertEquals(Resque::size($queue), 0);
182182
}
183-
}
183+
184+
public function testDequeueAll()
185+
{
186+
$queue = 'jobs';
187+
Resque::enqueue($queue, 'Test_Job_Dequeue');
188+
Resque::enqueue($queue, 'Test_Job_Dequeue');
189+
$this->assertEquals(Resque::size($queue), 2);
190+
$this->assertEquals(Resque::dequeue($queue), 2);
191+
$this->assertEquals(Resque::size($queue), 0);
192+
}
193+
194+
public function testDequeueMakeSureNotDeleteOthers()
195+
{
196+
$queue = 'jobs';
197+
Resque::enqueue($queue, 'Test_Job_Dequeue');
198+
Resque::enqueue($queue, 'Test_Job_Dequeue');
199+
$other_queue = 'other_jobs';
200+
Resque::enqueue($other_queue, 'Test_Job_Dequeue');
201+
Resque::enqueue($other_queue, 'Test_Job_Dequeue');
202+
$this->assertEquals(Resque::size($queue), 2);
203+
$this->assertEquals(Resque::size($other_queue), 2);
204+
$this->assertEquals(Resque::dequeue($queue), 2);
205+
$this->assertEquals(Resque::size($queue), 0);
206+
$this->assertEquals(Resque::size($other_queue), 2);
207+
}
208+
209+
public function testDequeueSpecificItem()
210+
{
211+
$queue = 'jobs';
212+
Resque::enqueue($queue, 'Test_Job_Dequeue1');
213+
Resque::enqueue($queue, 'Test_Job_Dequeue2');
214+
$this->assertEquals(Resque::size($queue), 2);
215+
$test = array('Test_Job_Dequeue2');
216+
$this->assertEquals(Resque::dequeue($queue, $test), 1);
217+
$this->assertEquals(Resque::size($queue), 1);
218+
}
219+
220+
public function testDequeueSpecificMultipleItems()
221+
{
222+
$queue = 'jobs';
223+
Resque::enqueue($queue, 'Test_Job_Dequeue1');
224+
Resque::enqueue($queue, 'Test_Job_Dequeue2');
225+
Resque::enqueue($queue, 'Test_Job_Dequeue3');
226+
$this->assertEquals(Resque::size($queue), 3);
227+
$test = array('Test_Job_Dequeue2', 'Test_Job_Dequeue3');
228+
$this->assertEquals(Resque::dequeue($queue, $test), 2);
229+
$this->assertEquals(Resque::size($queue), 1);
230+
}
231+
232+
public function testDequeueNonExistingItem()
233+
{
234+
$queue = 'jobs';
235+
Resque::enqueue($queue, 'Test_Job_Dequeue1');
236+
Resque::enqueue($queue, 'Test_Job_Dequeue2');
237+
Resque::enqueue($queue, 'Test_Job_Dequeue3');
238+
$this->assertEquals(Resque::size($queue), 3);
239+
$test = array('Test_Job_Dequeue4');
240+
$this->assertEquals(Resque::dequeue($queue, $test), 0);
241+
$this->assertEquals(Resque::size($queue), 3);
242+
}
243+
244+
public function testDequeueNonExistingItem2()
245+
{
246+
$queue = 'jobs';
247+
Resque::enqueue($queue, 'Test_Job_Dequeue1');
248+
Resque::enqueue($queue, 'Test_Job_Dequeue2');
249+
Resque::enqueue($queue, 'Test_Job_Dequeue3');
250+
$this->assertEquals(Resque::size($queue), 3);
251+
$test = array('Test_Job_Dequeue4', 'Test_Job_Dequeue1');
252+
$this->assertEquals(Resque::dequeue($queue, $test), 1);
253+
$this->assertEquals(Resque::size($queue), 2);
254+
}
255+
256+
public function testDequeueItemID()
257+
{
258+
$queue = 'jobs';
259+
Resque::enqueue($queue, 'Test_Job_Dequeue');
260+
$qid = Resque::enqueue($queue, 'Test_Job_Dequeue');
261+
$this->assertEquals(Resque::size($queue), 2);
262+
$test = array('Test_Job_Dequeue' => $qid);
263+
$this->assertEquals(Resque::dequeue($queue, $test), 1);
264+
$this->assertEquals(Resque::size($queue), 1);
265+
}
266+
267+
public function testDequeueWrongItemID()
268+
{
269+
$queue = 'jobs';
270+
Resque::enqueue($queue, 'Test_Job_Dequeue');
271+
$qid = Resque::enqueue($queue, 'Test_Job_Dequeue');
272+
$this->assertEquals(Resque::size($queue), 2);
273+
#qid right but class name is wrong
274+
$test = array('Test_Job_Dequeue1' => $qid);
275+
$this->assertEquals(Resque::dequeue($queue, $test), 0);
276+
$this->assertEquals(Resque::size($queue), 2);
277+
}
278+
279+
public function testDequeueWrongItemID2()
280+
{
281+
$queue = 'jobs';
282+
Resque::enqueue($queue, 'Test_Job_Dequeue');
283+
$qid = Resque::enqueue($queue, 'Test_Job_Dequeue');
284+
$this->assertEquals(Resque::size($queue), 2);
285+
$test = array('Test_Job_Dequeue' => 'r4nD0mH4sh3dId');
286+
$this->assertEquals(Resque::dequeue($queue, $test), 0);
287+
$this->assertEquals(Resque::size($queue), 2);
288+
}
289+
290+
public function testDequeueItemWithArg()
291+
{
292+
$queue = 'jobs';
293+
$arg = array('foo' => 1, 'bar' => 2);
294+
Resque::enqueue($queue, 'Test_Job_Dequeue9');
295+
Resque::enqueue($queue, 'Test_Job_Dequeue9', $arg);
296+
$this->assertEquals(Resque::size($queue), 2);
297+
$test = array('Test_Job_Dequeue9' => $arg);
298+
$this->assertEquals(Resque::dequeue($queue, $test), 1);
299+
#$this->assertEquals(Resque::size($queue), 1);
300+
}
301+
302+
public function testDequeueItemWithUnorderedArg()
303+
{
304+
$queue = 'jobs';
305+
$arg = array('foo' => 1, 'bar' => 2);
306+
$arg2 = array('bar' => 2, 'foo' => 1);
307+
Resque::enqueue($queue, 'Test_Job_Dequeue');
308+
Resque::enqueue($queue, 'Test_Job_Dequeue', $arg);
309+
$this->assertEquals(Resque::size($queue), 2);
310+
$test = array('Test_Job_Dequeue' => $arg2);
311+
$this->assertEquals(Resque::dequeue($queue, $test), 1);
312+
$this->assertEquals(Resque::size($queue), 1);
313+
}
314+
315+
public function testDequeueItemWithiWrongArg()
316+
{
317+
$queue = 'jobs';
318+
$arg = array('foo' => 1, 'bar' => 2);
319+
$arg2 = array('foo' => 2, 'bar' => 3);
320+
Resque::enqueue($queue, 'Test_Job_Dequeue');
321+
Resque::enqueue($queue, 'Test_Job_Dequeue', $arg);
322+
$this->assertEquals(Resque::size($queue), 2);
323+
$test = array('Test_Job_Dequeue' => $arg2);
324+
$this->assertEquals(Resque::dequeue($queue, $test), 0);
325+
$this->assertEquals(Resque::size($queue), 2);
326+
}
327+
328+
}

0 commit comments

Comments
 (0)