Skip to content

Commit e60a398

Browse files
committed
Immediately notify processor when a queue is unlocked. This fixes #23
1 parent 1cf5d77 commit e60a398

File tree

2 files changed

+39
-5
lines changed

2 files changed

+39
-5
lines changed

src/main/java/org/swisspush/redisques/RedisQues.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,13 @@ public void handle(final Message<JsonObject> event) {
231231
redisClient.hget(redisques_locks, event.body().getJsonObject(PAYLOAD).getString(QUEUENAME),new GetLockHandler(event));
232232
break;
233233
case deleteLock:
234-
redisClient.hdel(redisques_locks, event.body().getJsonObject(PAYLOAD).getString(QUEUENAME), new DeleteLockHandler(event));
234+
String queueName = event.body().getJsonObject(PAYLOAD).getString(QUEUENAME);
235+
redisClient.exists(queuesPrefix + queueName, event1 -> {
236+
if(event1.succeeded() && event1.result() == 1){
237+
notifyConsumer(queueName);
238+
}
239+
redisClient.hdel(redisques_locks, queueName, new DeleteLockHandler(event));
240+
});
235241
break;
236242
case getQueueItemsCount:
237243
redisClient.llen(queuesPrefix + event.body().getJsonObject(PAYLOAD).getString(QUEUENAME), new GetQueueItemsCountHandler(event));

src/test/java/org/swisspush/redisques/RedisQuesProcessorTest.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,17 +347,45 @@ public void queueProcessorShouldNotBeNotNotifiedWithLockedQueue(TestContext cont
347347

348348
lockQueue(queue);
349349

350-
351350
queueProcessor.handler(event -> processorCalled.set(true));
352351

353352
eventBusSend(buildEnqueueOperation(queue, "hello"), reply -> {
354353
context.assertEquals(OK, reply.result().body().getString(STATUS));
354+
sleep(5000);
355+
context.assertFalse(processorCalled.get(), "QueueProcessor should not have been called after enqueue into a locked queue");
356+
async.complete();
355357
});
358+
}
356359

357-
// after at most 5 seconds, the processor-address consumer should not have been called
358-
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(processorCalled::get, equalTo(false));
360+
@Test
361+
public void queueProcessorShouldHaveBeenNotifiedImmediatelyAfterQueueUnlock(TestContext context) throws Exception {
362+
Async async = context.async();
363+
flushAll();
359364

360-
async.complete();
365+
String queue = "queue1";
366+
final AtomicBoolean processorCalled = new AtomicBoolean(false);
367+
368+
lockQueue(queue);
369+
370+
queueProcessor.handler(event -> {
371+
processorCalled.set(true);
372+
});
373+
374+
eventBusSend(buildEnqueueOperation(queue, "hello"), reply -> {
375+
context.assertEquals(OK, reply.result().body().getString(STATUS));
376+
377+
sleep(5000);
378+
379+
// after at most 5 seconds, the processor-address consumer should not have been called
380+
context.assertFalse(processorCalled.get(), "QueueProcessor should not have been called after enqueue into a locked queue");
381+
382+
eventBusSend(buildDeleteLockOperation(queue), event -> {
383+
context.assertEquals(OK, event.result().body().getString(STATUS));
384+
sleep(100);
385+
context.assertTrue(processorCalled.get(), "QueueProcessor should have been called immediately after queue unlock");
386+
async.complete();
387+
});
388+
});
361389
}
362390

363391
private void sleep(int millis) {

0 commit comments

Comments
 (0)