Skip to content

Commit 4fe94a3

Browse files
authored
Merge pull request #217 from swisspost/develop
PR for release
2 parents f065b38 + 05e4a2e commit 4fe94a3

File tree

4 files changed

+69
-9
lines changed

4 files changed

+69
-9
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<modelVersion>4.0.0</modelVersion>
33
<groupId>org.swisspush</groupId>
44
<artifactId>redisques</artifactId>
5-
<version>4.1.3-SNAPSHOT</version>
5+
<version>4.1.4-SNAPSHOT</version>
66
<name>redisques</name>
77
<description>
88
A highly scalable redis-persistent queuing system for vertx

src/main/java/org/swisspush/redisques/RedisQues.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ public void start(Promise<Void> promise) {
332332
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
333333
locksKey = modConfig.getRedisPrefix() + "locks";
334334
queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec";
335-
consumerLockTime = 2 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
335+
consumerLockTime = modConfig.getConsumerLockMultiplier() * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
336336
timer = new RedisQuesTimer(vertx);
337337

338338
if (redisProvider == null) {
@@ -670,7 +670,7 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s
670670

671671

672672
private void registerQueueCheck() {
673-
vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> {
673+
periodicSkipScheduler.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), "checkQueues", periodicEvent -> {
674674
redisProvider.redis().compose((RedisAPI redisAPI) -> {
675675
int checkInterval = configurationProvider.configuration().getCheckInterval();
676676
return redisAPI.send(Command.SET, queueCheckLastexecKey, String.valueOf(currentTimeMillis()), "NX", "EX", String.valueOf(checkInterval));
@@ -1141,6 +1141,7 @@ private void updateTimestamp(final String queueName, Handler<AsyncResult<Respons
11411141
* This uses a sorted set of queue names scored by last update timestamp.
11421142
*/
11431143
private Future<Void> checkQueues() {
1144+
final long startTs = System.currentTimeMillis();
11441145
final var ctx = new Object() {
11451146
long limit;
11461147
RedisAPI redisAPI;
@@ -1158,6 +1159,7 @@ private Future<Void> checkQueues() {
11581159
redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p);
11591160
return p.future();
11601161
}).compose((Response queues) -> {
1162+
log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs);
11611163
assert ctx.counter == null;
11621164
assert ctx.iter == null;
11631165
ctx.counter = new AtomicInteger(queues.size());
@@ -1167,6 +1169,7 @@ private Future<Void> checkQueues() {
11671169
upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor<Void>() {
11681170
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void ctx_) {
11691171
if (ctx.iter.hasNext()) {
1172+
final long perQueueStartTs = System.currentTimeMillis();
11701173
var queueObject = ctx.iter.next();
11711174
// Check if the inactive queue is not empty (i.e. the key exists)
11721175
final String queueName = queueObject.toString();
@@ -1183,13 +1186,16 @@ private Future<Void> checkQueues() {
11831186
if (notifyConsumerEvent.failed()) log.warn("TODO error handling",
11841187
exceptionFactory.newException("notifyConsumer(" + queueName + ") failed",
11851188
notifyConsumerEvent.cause()));
1189+
if (log.isTraceEnabled()) {
1190+
log.trace("refreshRegistration for queue {} time used is {} ms", queueName, System.currentTimeMillis() - perQueueStartTs);
1191+
}
11861192
onDone.accept(null, null);
11871193
});
11881194
});
11891195
};
11901196
ctx.redisAPI.exists(Collections.singletonList(key), event -> {
11911197
if (event.failed() || event.result() == null) {
1192-
log.error("RedisQues is unable to check existence of queue " + queueName,
1198+
log.error("RedisQues is unable to check existence of queue {}", queueName,
11931199
exceptionFactory.newException("redisAPI.exists(" + key + ") failed", event.cause()));
11941200
onDone.accept(null, null);
11951201
return;
@@ -1242,6 +1248,7 @@ private Future<Void> checkQueues() {
12421248
}
12431249
});
12441250
} else {
1251+
log.debug("all queue items time used is {} ms", System.currentTimeMillis() - startTs);
12451252
onDone.accept(null, null);
12461253
}
12471254
return ctx.iter.hasNext();

src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class RedisquesConfiguration {
2626
private final String metricStorageName;
2727
private final int metricRefreshPeriod;
2828
private final int refreshPeriod;
29+
private final int consumerLockMultiplier;
2930
private final List<String> redisHosts;
3031
private final List<Integer> redisPorts;
3132
private boolean redisEnableTls;
@@ -66,6 +67,7 @@ public class RedisquesConfiguration {
6667
private static final int DEFAULT_REDIS_RECONNECT_ATTEMPTS = 0;
6768
private static final int DEFAULT_REDIS_RECONNECT_DELAY_SEC = 30;
6869
private static final int DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS = 180_000;
70+
private static final int DEFAULT_CONSUMER_LOCK_MULTIPLIER = 2;
6971

7072
// We want to have more than the default of 24 max waiting requests and therefore
7173
// set the default here to infinity value. See as well:
@@ -88,6 +90,7 @@ public class RedisquesConfiguration {
8890
public static final String PROP_METRIC_STORAGE_NAME = "metric-storage-name";
8991
public static final String PROP_METRIC_REFRESH_PERIOD = "metric-refresh-period";
9092
public static final String PROP_REFRESH_PERIOD = "refresh-period";
93+
public static final String PROP_CONSUMER_LOCK_MULTIPLIER = "consumer-lock-multiplier";
9194
public static final String PROP_REDIS_HOST = "redisHost";
9295
public static final String PROP_REDIS_HOST_LIST = "redisHosts";
9396
public static final String PROP_REDIS_PORT = "redisPort";
@@ -148,7 +151,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
148151
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
149152
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
150153
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
151-
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
154+
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
152155
RedisClientType.STANDALONE, redisAuth, null, null, false, checkInterval,
153156
processorTimeout, processorDelayMax, httpRequestHandlerEnabled,
154157
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
@@ -173,7 +176,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
173176
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
174177
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
175178
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
176-
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
179+
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
177180
RedisClientType.STANDALONE, null, redisPassword, redisUser, redisEnableTls, checkInterval,
178181
processorTimeout, processorDelayMax, httpRequestHandlerEnabled,
179182
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
@@ -198,7 +201,32 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
198201
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
199202
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
200203
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
201-
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
204+
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
205+
redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout,
206+
processorDelayMax, httpRequestHandlerEnabled,
207+
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
208+
httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations,
209+
enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE,
210+
DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT,
211+
DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC,
212+
DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC,
213+
DEFAULT_REDIS_READY_CHECK_INTERVAL_MS);
214+
}
215+
216+
/**
217+
* Constructor with username and password (Redis ACL)
218+
*/
219+
public RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress,
220+
String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod,
221+
int consumerLockMultiplier, String redisHost, int redisPort, RedisClientType redisClientType, String redisPassword,
222+
String redisUser, boolean redisEnableTls, int checkInterval,
223+
int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled,
224+
boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix,
225+
String httpRequestHandlerUsername, String httpRequestHandlerPassword,
226+
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
227+
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
228+
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
229+
metricRefreshPeriod, refreshPeriod, consumerLockMultiplier, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
202230
redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout,
203231
processorDelayMax, httpRequestHandlerEnabled,
204232
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
@@ -212,7 +240,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
212240

213241
private RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress,
214242
String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod,
215-
List<String> redisHosts, List<Integer> redisPorts, RedisClientType redisClientType,
243+
int consumerLockMultiplier, List<String> redisHosts, List<Integer> redisPorts, RedisClientType redisClientType,
216244
String redisAuth, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval,
217245
int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled,
218246
boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix,
@@ -229,6 +257,7 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres
229257
this.processorAddress = processorAddress;
230258
this.publishMetricsAddress = publishMetricsAddress;
231259
this.refreshPeriod = refreshPeriod;
260+
this.consumerLockMultiplier = consumerLockMultiplier;
232261
this.redisHosts = redisHosts;
233262
this.redisPorts = redisPorts;
234263
this.redisClientType = redisClientType;
@@ -326,7 +355,7 @@ public static RedisquesConfigurationBuilder with() {
326355
private RedisquesConfiguration(RedisquesConfigurationBuilder builder) {
327356
this(builder.address, builder.configurationUpdatedAddress, builder.redisPrefix,
328357
builder.processorAddress, builder.publishMetricsAddress, builder.metricStorageName, builder.metricRefreshPeriod,
329-
builder.refreshPeriod, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth,
358+
builder.refreshPeriod, builder.consumerLockMultiplier, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth,
330359
builder.redisPassword, builder.redisUser, builder.redisEnableTls, builder.checkInterval,
331360
builder.processorTimeout, builder.processorDelayMax, builder.httpRequestHandlerEnabled,
332361
builder.httpRequestHandlerAuthenticationEnabled, builder.httpRequestHandlerPrefix,
@@ -354,6 +383,7 @@ public JsonObject asJsonObject() {
354383
obj.put(PROP_METRIC_STORAGE_NAME, getMetricStorageName());
355384
obj.put(PROP_METRIC_REFRESH_PERIOD, getMetricRefreshPeriod());
356385
obj.put(PROP_REFRESH_PERIOD, getRefreshPeriod());
386+
obj.put(PROP_CONSUMER_LOCK_MULTIPLIER, getConsumerLockMultiplier());
357387
obj.put(PROP_REDIS_HOST, getRedisHost());
358388
obj.put(PROP_REDIS_HOST_LIST, getRedisHosts());
359389
obj.put(PROP_REDIS_PORT, getRedisPort());
@@ -415,6 +445,9 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) {
415445
if (json.containsKey(PROP_REFRESH_PERIOD)) {
416446
builder.refreshPeriod(json.getInteger(PROP_REFRESH_PERIOD));
417447
}
448+
if (json.containsKey(PROP_CONSUMER_LOCK_MULTIPLIER)) {
449+
builder.consumerLockMultiplier(json.getInteger(PROP_CONSUMER_LOCK_MULTIPLIER));
450+
}
418451
if (json.containsKey(PROP_REDIS_HOST)) {
419452
builder.redisHost(json.getString(PROP_REDIS_HOST));
420453
}
@@ -549,6 +582,10 @@ public int getRefreshPeriod() {
549582
return refreshPeriod;
550583
}
551584

585+
public int getConsumerLockMultiplier() {
586+
return consumerLockMultiplier;
587+
}
588+
552589
public String getRedisHost() {
553590
return redisHosts.get(0);
554591
}
@@ -728,6 +765,7 @@ public static class RedisquesConfigurationBuilder {
728765
private String metricStorageName;
729766
private int metricRefreshPeriod;
730767
private int refreshPeriod;
768+
private int consumerLockMultiplier;
731769
private List<String> redisHosts;
732770
private List<Integer> redisPorts;
733771
private boolean redisEnableTls;
@@ -767,6 +805,7 @@ public RedisquesConfigurationBuilder() {
767805
this.processorAddress = "redisques-processor";
768806
this.metricRefreshPeriod = 10;
769807
this.refreshPeriod = 10;
808+
this.consumerLockMultiplier = DEFAULT_CONSUMER_LOCK_MULTIPLIER;
770809
this.redisHosts = Collections.singletonList("localhost");
771810
this.redisPorts = Collections.singletonList(6379);
772811
this.redisEnableTls = false;
@@ -836,6 +875,11 @@ public RedisquesConfigurationBuilder refreshPeriod(int refreshPeriod) {
836875
return this;
837876
}
838877

878+
public RedisquesConfigurationBuilder consumerLockMultiplier(int consumerLockMultiplier) {
879+
this.consumerLockMultiplier = consumerLockMultiplier;
880+
return this;
881+
}
882+
839883
public RedisquesConfigurationBuilder redisHost(String redisHost) {
840884
this.redisHosts = Collections.singletonList(redisHost);
841885
return this;

0 commit comments

Comments
 (0)