Skip to content

Commit c92236b

Browse files
committed
Added getQueues feature to get the name of all active queues
1 parent 88b6682 commit c92236b

File tree

5 files changed

+98
-3
lines changed

5 files changed

+98
-3
lines changed

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,25 @@ Response Data
103103
}
104104
```
105105

106+
#### getQueues
107+
108+
Request Data
109+
110+
```
111+
{
112+
"operation": "getQueues"
113+
}
114+
```
115+
116+
Response Data
117+
118+
```
119+
{
120+
"status": "ok" / "error",
121+
"value": <objArr RESULT>
122+
}
123+
```
124+
106125
#### getQueuesCount
107126

108127
Request Data

src/main/java/org/swisspush/redisques/RedisQues.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,10 @@ public void handle(final Message<JsonObject> event) {
230230
redisClient.llen(queuesPrefix + event.body().getJsonObject(PAYLOAD).getString(QUEUENAME), new GetQueueItemsCountHandler(event));
231231
break;
232232
case getQueuesCount:
233-
long timestamp = System.currentTimeMillis() - MAX_AGE_MILLISECONDS;
234-
redisClient.zcount(redisPrefix + "queues", timestamp, Double.MAX_VALUE, new GetQueuesCountHandler(event));
233+
redisClient.zcount(redisPrefix + "queues", getMaxAgeTimestamp(), Double.MAX_VALUE, new GetQueuesCountHandler(event));
234+
break;
235+
case getQueues:
236+
redisClient.zrangebyscore(redisPrefix + "queues", String.valueOf(getMaxAgeTimestamp()), "+inf", RangeLimitOptions.NONE, new GetQueuesHandler(event));
235237
break;
236238
default:
237239
unsupportedOperation(operation, event);
@@ -276,6 +278,10 @@ public void handle(Message<String> event) {
276278
});
277279
}
278280

281+
private long getMaxAgeTimestamp(){
282+
return System.currentTimeMillis() - MAX_AGE_MILLISECONDS;
283+
}
284+
279285
private void unsupportedOperation(String operation, Message<JsonObject> event){
280286
JsonObject reply = new JsonObject();
281287
String message = "QUEUE_ERROR: Unsupported operation received: " + operation;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package org.swisspush.redisques.handler;
2+
3+
import io.vertx.core.AsyncResult;
4+
import io.vertx.core.Handler;
5+
import io.vertx.core.eventbus.Message;
6+
import io.vertx.core.json.JsonArray;
7+
import io.vertx.core.json.JsonObject;
8+
9+
import java.util.List;
10+
11+
import static org.swisspush.redisques.util.RedisquesAPI.*;
12+
13+
/**
14+
* Class GetQueuesCountHandler.
15+
*
16+
* @author https://github.com/mcweba [Marc-Andre Weber]
17+
*/
18+
public class GetQueuesHandler implements Handler<AsyncResult<JsonArray>> {
19+
private Message<JsonObject> event;
20+
21+
public GetQueuesHandler(Message<JsonObject> event) {
22+
this.event = event;
23+
}
24+
25+
@Override
26+
public void handle(AsyncResult<JsonArray> reply) {
27+
if(reply.succeeded()){
28+
List<Object> list = reply.result().getList();
29+
JsonObject result = new JsonObject();
30+
JsonArray items = new JsonArray();
31+
for (Object item : list.toArray()) {
32+
items.add((String) item);
33+
}
34+
result.put("queues", items);
35+
event.reply(new JsonObject().put(STATUS, OK).put(VALUE, result));
36+
} else {
37+
event.reply(new JsonObject().put(STATUS, ERROR));
38+
}
39+
}
40+
}

src/main/java/org/swisspush/redisques/util/RedisquesAPI.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class RedisquesAPI {
2626
public enum QueueOperation {
2727
enqueue, check, reset, stop, getQueueItems, addQueueItem, deleteQueueItem,
2828
getQueueItem, replaceQueueItem, deleteAllQueueItems, getAllLocks, putLock,
29-
getLock, deleteLock, getQueuesCount, getQueueItemsCount;
29+
getLock, deleteLock, getQueues, getQueuesCount, getQueueItemsCount;
3030

3131
public static QueueOperation fromString(String op){
3232
for (QueueOperation queueOperation : values()) {
@@ -84,6 +84,10 @@ public static JsonObject buildDeleteAllQueueItemsOperation(String queueName){
8484
return buildOperation(QueueOperation.deleteAllQueueItems, new JsonObject().put(QUEUENAME, queueName));
8585
}
8686

87+
public static JsonObject buildGetQueuesOperation(){
88+
return buildOperation(QueueOperation.getQueues);
89+
}
90+
8791
public static JsonObject buildGetQueuesCountOperation(){
8892
return buildOperation(QueueOperation.getQueuesCount);
8993
}

src/test/java/org/swisspush/redisques/RedisQuesTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,32 @@ public void getQueueItems(TestContext context) {
9191
});
9292
}
9393

94+
@Test
95+
public void getQueues(TestContext context) {
96+
Async asyncEnqueue = context.async(100);
97+
flushAll();
98+
assertKeyCount(context, QUEUES_PREFIX, 0);
99+
for (int i = 0; i < 100; i++) {
100+
eventBusSend(buildEnqueueOperation("queue" + i, "testItem"), message -> {
101+
context.assertEquals(OK, message.result().body().getString(STATUS));
102+
asyncEnqueue.countDown();
103+
});
104+
}
105+
asyncEnqueue.awaitSuccess();
106+
107+
assertKeyCount(context, QUEUES_PREFIX, 100);
108+
Async async = context.async();
109+
eventBusSend(buildGetQueuesOperation(), message -> {
110+
context.assertEquals(OK, message.result().body().getString(STATUS));
111+
JsonArray queuesArray = message.result().body().getJsonObject(VALUE).getJsonArray("queues");
112+
context.assertEquals(100, queuesArray.size());
113+
for (int i = 0; i < 100; i++) {
114+
context.assertTrue(queuesArray.contains("queue"+i), "item queue" + i + " expected to be in result");
115+
}
116+
async.complete();
117+
});
118+
}
119+
94120
@Test
95121
public void getQueuesCount(TestContext context) {
96122
Async asyncEnqueue = context.async(100);

0 commit comments

Comments
 (0)