Skip to content

Commit 3c73095

Browse files
authored
Merge pull request #221 from swisspost/develop
PR-Release
2 parents 4fe94a3 + 3034ebd commit 3c73095

File tree

2 files changed

+44
-10
lines changed

2 files changed

+44
-10
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<modelVersion>4.0.0</modelVersion>
33
<groupId>org.swisspush</groupId>
44
<artifactId>redisques</artifactId>
5-
<version>4.1.4-SNAPSHOT</version>
5+
<version>4.1.5-SNAPSHOT</version>
66
<name>redisques</name>
77
<description>
88
A highly scalable redis-persistent queuing system for vertx

src/main/java/org/swisspush/redisques/RedisQues.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,36 @@ private void initialize() {
420420
registerActiveQueueRegistrationRefresh();
421421
registerQueueCheck();
422422
registerMetricsGathering(configuration);
423+
registerNotExpiredQueueCheck();
424+
}
425+
426+
private void registerNotExpiredQueueCheck() {
427+
vertx.setPeriodic(20 * 1000, event -> {
428+
if (!log.isDebugEnabled()) {
429+
return;
430+
}
431+
String keysPattern = consumersPrefix + "*";
432+
log.debug("RedisQues list not expired consumers keys:");
433+
redisProvider.redis().onSuccess(redisAPI -> redisAPI.scan(Arrays.asList("0", "MATCH", keysPattern, "COUNT", "1000"), keysResult -> {
434+
if (keysResult.failed() || keysResult.result() == null || keysResult.result().size() != 2) {
435+
log.error("Unable to get redis keys of consumers", keysResult.cause());
436+
return;
437+
}
438+
Response keys = keysResult.result().get(1);
439+
if (keys == null || keys.size() == 0) {
440+
log.debug("0 not expired consumers keys found");
441+
return;
442+
}
443+
444+
if (log.isTraceEnabled()) {
445+
for (Response response : keys) {
446+
log.trace(response.toString());
447+
}
448+
}
449+
log.debug("{} not expired consumers keys found, {} keys in myQueues list", keys.size(), myQueues.size());
450+
}))
451+
.onFailure(throwable -> log.error("Redis: Unable to get redis keys of consumers", throwable));
452+
});
423453
}
424454

425455
private void registerMetricsGathering(RedisquesConfiguration configuration){
@@ -485,7 +515,7 @@ public void run() {
485515
startEpochMs = currentTimeMillis();
486516
if (size > 5_000) log.warn("Going to report {} dequeue statistics towards collector", size);
487517
else if (size > 500) log.info("Going to report {} dequeue statistics towards collector", size);
488-
else log.debug("Going to report {} dequeue statistics towards collector", size);
518+
else log.trace("Going to report {} dequeue statistics towards collector", size);
489519
} catch (Throwable ex) {
490520
isRunning.set(false);
491521
throw ex;
@@ -519,7 +549,9 @@ void resume() {
519549
if (event.failed()) {
520550
log.error("publishing dequeue statistics not complete, just continue", event.cause());
521551
}
522-
log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
552+
if (log.isTraceEnabled()){
553+
log.trace("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
554+
}
523555
isRunning.set(false);
524556
});
525557
} catch (Throwable ex) {
@@ -828,7 +860,7 @@ private Future<Void> consume(final String queueName) {
828860
// consumer was restarted
829861
log.warn("Received request to consume from a queue I did not know about: {}", queueName);
830862
}
831-
log.debug("RedisQues Starting to consume queue {}", queueName);
863+
log.trace("RedisQues Starting to consume queue {}", queueName);
832864
readQueue(queueName).onComplete(readQueueEvent -> {
833865
if (readQueueEvent.failed()) {
834866
log.warn("TODO error handling", exceptionFactory.newException(
@@ -837,12 +869,12 @@ private Future<Void> consume(final String queueName) {
837869
promise.complete();
838870
});
839871
} else {
840-
log.debug("RedisQues Queue {} is already being consumed", queueName);
872+
log.trace("RedisQues Queue {} is already being consumed", queueName);
841873
promise.complete();
842874
}
843875
} else {
844876
// Somehow registration changed. Let's renotify.
845-
log.debug("Registration for queue {} has changed to {}", queueName, consumer);
877+
log.trace("Registration for queue {} has changed to {}", queueName, consumer);
846878
myQueues.remove(queueName);
847879
notifyConsumer(queueName).onComplete(notifyConsumerEvent -> {
848880
if (notifyConsumerEvent.failed()) {
@@ -961,7 +993,7 @@ private Future<Void> readQueue(final String queueName) {
961993
// Failed. Message will be kept in queue and retried later
962994
log.debug("RedisQues Processing failed for queue {}", queueName);
963995
// reschedule
964-
log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval);
996+
log.trace("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval);
965997
rescheduleSendMessageAfterFailure(queueName, retryInterval);
966998
promise.complete();
967999
}
@@ -1159,7 +1191,9 @@ private Future<Void> checkQueues() {
11591191
redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p);
11601192
return p.future();
11611193
}).compose((Response queues) -> {
1162-
log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs);
1194+
if (log.isDebugEnabled()) {
1195+
log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs);
1196+
}
11631197
assert ctx.counter == null;
11641198
assert ctx.iter == null;
11651199
ctx.counter = new AtomicInteger(queues.size());
@@ -1201,7 +1235,7 @@ private Future<Void> checkQueues() {
12011235
return;
12021236
}
12031237
if (event.result().toLong() == 1) {
1204-
log.debug("Updating queue timestamp for queue '{}'", queueName);
1238+
log.trace("Updating queue timestamp for queue '{}'", queueName);
12051239
// If not empty, update the queue timestamp to keep it in the sorted set.
12061240
updateTimestamp(queueName, upTsResult -> {
12071241
if (upTsResult.failed()) {
@@ -1248,7 +1282,6 @@ private Future<Void> checkQueues() {
12481282
}
12491283
});
12501284
} else {
1251-
log.debug("all queue items time used is {} ms", System.currentTimeMillis() - startTs);
12521285
onDone.accept(null, null);
12531286
}
12541287
return ctx.iter.hasNext();
@@ -1263,6 +1296,7 @@ private Future<Void> checkQueues() {
12631296
ctx.counter = null;
12641297
ctx.iter = null;
12651298
// Mark this composition step as completed.
1299+
log.debug("all queue items time used is {} ms", System.currentTimeMillis() - startTs);
12661300
p.complete();
12671301
}
12681302
});

0 commit comments

Comments
 (0)