2121import java .util .UUID ;
2222import java .util .concurrent .atomic .AtomicInteger ;
2323
24+ import static org .swisspush .redisques .util .RedisquesAPI .*;
25+
2426public class RedisQues extends AbstractVerticle {
2527
2628 // State of each queue. Consuming means there is a message being processed.
@@ -65,20 +67,7 @@ private enum QueueState {
6567 // Address of message processors
6668 private String processorAddress = "redisques-processor" ;
6769
68- public static final String STATUS = "status" ;
69- public static final String MESSAGE = "message" ;
70- public static final String VALUE = "value" ;
71- public static final String INFO = "info" ;
72- public static final String OK = "ok" ;
73- public static final String ERROR = "error" ;
74- public static final String PAYLOAD = "payload" ;
75- public static final String QUEUE_NAME = "queuename" ;
76- public static final String REQUESTED_BY = "requestedBy" ;
7770 public static final String TIMESTAMP = "timestamp" ;
78- public static final String LIMIT = "limit" ;
79- public static final String INDEX = "index" ;
80- public static final String BUFFER = "buffer" ;
81- public static final String OPERATION = "operation" ;
8271
8372 // Consumers periodically refresh their subscription while they are
8473 // consuming.
@@ -88,6 +77,7 @@ private enum QueueState {
8877 private int processorTimeout = 240000 ;
8978
9079 private static final int DEFAULT_MAX_QUEUEITEM_COUNT = 49 ;
80+ private static final int MAX_AGE_MILLISECONDS = 120000 ; // 120 seconds
9181
9282 // Handler receiving registration requests when no consumer is registered
9383 // for a queue.
@@ -139,96 +129,114 @@ public void handle(final Message<JsonObject> event) {
139129 if (log .isTraceEnabled ()) {
140130 log .trace ("RedisQues got operation:" + operation );
141131 }
142- switch (operation ) {
143- case "enqueue" :
144- updateTimestamp (event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME ), null );
145- String keyEnqueue = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME );
132+
133+ QueueOperation queueOperation = QueueOperation .fromString (operation );
134+ if (queueOperation == null ){
135+ unsupportedOperation (operation , event );
136+ return ;
137+ }
138+
139+ switch (queueOperation ) {
140+ case enqueue :
141+ updateTimestamp (event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME ), null );
142+ String keyEnqueue = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME );
146143 String valueEnqueue = event .body ().getString (MESSAGE );
147144 redisClient .rpush (keyEnqueue , valueEnqueue , event2 -> {
148145 JsonObject reply = new JsonObject ();
149146 if (event2 .succeeded ()){
150- log .debug ("RedisQues Enqueued message into queue " + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME ));
151- notifyConsumer (event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME ));
147+ log .debug ("RedisQues Enqueued message into queue " + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME ));
148+ notifyConsumer (event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME ));
152149 reply .put (STATUS , OK );
153150 reply .put (MESSAGE , "enqueued" );
154151 event .reply (reply );
155152 } else {
156- String message = "RedisQues QUEUE_ERROR: Error while enqueing message into queue " + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME );
153+ String message = "RedisQues QUEUE_ERROR: Error while enqueueing message into queue " + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME );
157154 log .error (message );
158155 reply .put (STATUS , ERROR );
159156 reply .put (MESSAGE , message );
160157 event .reply (reply );
161158 }
162159 });
163160 break ;
164- case " check" :
161+ case check :
165162 checkQueues ();
166163 break ;
167- case " reset" :
164+ case reset :
168165 resetConsumers ();
169166 break ;
170- case " stop" :
167+ case stop :
171168 gracefulStop (event1 -> {
172169 JsonObject reply = new JsonObject ();
173170 reply .put (STATUS , OK );
174171 });
175172 break ;
176- case "getListRange" :
177- String keyListRange = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME );
173+ case getQueueItems :
174+ String keyListRange = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME );
178175 int maxQueueItemCountIndex = getMaxQueueItemCountIndex (event .body ().getJsonObject (PAYLOAD ).getString (LIMIT ));
179176 redisClient .llen (keyListRange , countReply -> {
180- redisClient .lrange (keyListRange , 0 , maxQueueItemCountIndex , new GetListRangeHandler (event , countReply .result ()));
177+ redisClient .lrange (keyListRange , 0 , maxQueueItemCountIndex , new GetQueueItemsHandler (event , countReply .result ()));
181178 });
182179 break ;
183- case "addItem" :
184- String key1 = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME );
180+ case addQueueItem :
181+ String key1 = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME );
185182 String valueAddItem = event .body ().getJsonObject (PAYLOAD ).getString (BUFFER );
186- redisClient .rpush (key1 , valueAddItem , new AddItemHandler (event ));
183+ redisClient .rpush (key1 , valueAddItem , new AddQueueItemHandler (event ));
187184 break ;
188- case "deleteItem" :
189- String keyLset = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME );
185+ case deleteQueueItem :
186+ String keyLset = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME );
190187 int indexLset = event .body ().getJsonObject (PAYLOAD ).getInteger (INDEX );
191188 redisClient .lset (keyLset , indexLset , "TO_DELETE" , event1 -> {
192189 if (event1 .succeeded ()){
193- String keyLrem = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME );
190+ String keyLrem = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME );
194191 redisClient .lrem (keyLrem , 0 , "TO_DELETE" , replyLrem -> event .reply (new JsonObject ().put (STATUS , OK )));
195192 } else {
196193 event .reply (new JsonObject ().put (STATUS , ERROR ));
197194 }
198195 });
199196 break ;
200- case "getItem" :
201- String key = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME );
197+ case getQueueItem :
198+ String key = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME );
202199 int index = event .body ().getJsonObject (PAYLOAD ).getInteger (INDEX );
203- redisClient .lindex (key , index , new GetItemHandler (event ));
200+ redisClient .lindex (key , index , new GetQueueItemHandler (event ));
204201 break ;
205- case "replaceItem" :
206- String keyReplaceItem = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME );
202+ case replaceQueueItem :
203+ String keyReplaceItem = queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME );
207204 int indexReplaceItem = event .body ().getJsonObject (PAYLOAD ).getInteger (INDEX );
208205 String bufferReplaceItem = event .body ().getJsonObject (PAYLOAD ).getString (BUFFER );
209- redisClient .lset (keyReplaceItem , indexReplaceItem , bufferReplaceItem , new ReplaceItemHandler (event ));
206+ redisClient .lset (keyReplaceItem , indexReplaceItem , bufferReplaceItem , new ReplaceQueueItemHandler (event ));
210207 break ;
211- case " deleteAllQueueItems" :
212- redisClient .del (queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME ), new DeleteAllQueueItems (event ));
208+ case deleteAllQueueItems :
209+ redisClient .del (queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME ), new DeleteAllQueueItems (event ));
213210 break ;
214- case " getAllLocks" :
211+ case getAllLocks :
215212 redisClient .hkeys (redisques_locks , new GetAllLocksHandler (event ));
216213 break ;
217- case " putLock" :
214+ case putLock :
218215 JsonObject lockInfo = extractLockInfo (event .body ().getJsonObject (PAYLOAD ).getString (REQUESTED_BY ));
219216 if (lockInfo != null ) {
220- redisClient .hmset (redisques_locks , new JsonObject ().put (event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME ), lockInfo .encode ()),
217+ redisClient .hmset (redisques_locks , new JsonObject ().put (event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME ), lockInfo .encode ()),
221218 new PutLockHandler (event ));
222219 } else {
223220 event .reply (new JsonObject ().put (STATUS , ERROR ).put (MESSAGE , "Property '" + REQUESTED_BY + "' missing" ));
224221 }
225222 break ;
226- case "getLock" :
227- redisClient .hget (redisques_locks , event .body ().getJsonObject (PAYLOAD ).getString (QUEUE_NAME ),new GetLockHandler (event ));
223+ case getLock :
224+ redisClient .hget (redisques_locks , event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME ),new GetLockHandler (event ));
225+ break ;
226+ case deleteLock :
227+ redisClient .hdel (redisques_locks , event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME ), new DeleteLockHandler (event ));
228+ break ;
229+ case getQueueItemsCount :
230+ redisClient .llen (queuesPrefix + event .body ().getJsonObject (PAYLOAD ).getString (QUEUENAME ), new GetQueueItemsCountHandler (event ));
228231 break ;
229- case "deleteLock" :
230- redisClient .hdel ( redisques_locks , event . body (). getJsonObject ( PAYLOAD ). getString ( QUEUE_NAME ) , new DeleteLockHandler (event ));
232+ case getQueuesCount :
233+ redisClient .zcount ( redisPrefix + "queues" , getMaxAgeTimestamp (), Double . MAX_VALUE , new GetQueuesCountHandler (event ));
231234 break ;
235+ case getQueues :
236+ redisClient .zrangebyscore (redisPrefix + "queues" , String .valueOf (getMaxAgeTimestamp ()), "+inf" , RangeLimitOptions .NONE , new GetQueuesHandler (event ));
237+ break ;
238+ default :
239+ unsupportedOperation (operation , event );
232240 }
233241 }
234242 });
@@ -270,6 +278,19 @@ public void handle(Message<String> event) {
270278 });
271279 }
272280
281+ private long getMaxAgeTimestamp (){
282+ return System .currentTimeMillis () - MAX_AGE_MILLISECONDS ;
283+ }
284+
285+ private void unsupportedOperation (String operation , Message <JsonObject > event ){
286+ JsonObject reply = new JsonObject ();
287+ String message = "QUEUE_ERROR: Unsupported operation received: " + operation ;
288+ log .error (message );
289+ reply .put (STATUS , ERROR );
290+ reply .put (MESSAGE , message );
291+ event .reply (reply );
292+ }
293+
273294 private JsonObject extractLockInfo (String requestedBy ) {
274295 if (requestedBy == null ) {
275296 return null ;
0 commit comments