Skip to content

Commit ca65c77

Browse files
authored
Merge pull request #315 from swisspost/develop
PR for release
2 parents f14fb23 + 9a5d940 commit ca65c77

File tree

8 files changed

+149
-63
lines changed

8 files changed

+149
-63
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<modelVersion>4.0.0</modelVersion>
33
<groupId>org.swisspush</groupId>
44
<artifactId>redisques</artifactId>
5-
<version>4.1.21-SNAPSHOT</version>
5+
<version>4.1.22-SNAPSHOT</version>
66
<name>redisques</name>
77
<description>
88
A highly scalable redis-persistent queuing system for vertx

src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,13 @@ public static void init(
8585
if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) {
8686
var handler = new RedisquesHttpRequestHandler(
8787
vertx, modConfig, queueStatsService, exceptionFactory);
88+
8889
// in Vert.x 2x 100-continues was activated per default, in vert.x 3x it is off per default.
89-
HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true);
90+
HttpServerOptions options = new HttpServerOptions()
91+
.setHandle100ContinueAutomatically(true)
92+
.setMaxHeaderSize(modConfig.getHttpRequestHandlerMaxHeaderSize())
93+
.setMaxInitialLineLength(modConfig.getHttpRequestHandlerMaxInitialLineLength());
94+
9095
vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> {
9196
if (result.succeeded()) {
9297
log.info("Successfully started http request handler on port {}", modConfig.getHttpRequestHandlerPort());

src/main/java/org/swisspush/redisques/queue/QueueConsumerRunner.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.swisspush.redisques.queue;
22

3-
import com.google.common.math.PairedStats;
43
import io.vertx.core.AsyncResult;
54
import io.vertx.core.Future;
65
import io.vertx.core.Handler;
@@ -32,6 +31,8 @@
3231
import java.util.function.Function;
3332
import java.util.stream.Collectors;
3433

34+
import static io.vertx.core.Future.failedFuture;
35+
import static io.vertx.core.Future.succeededFuture;
3536
import static java.lang.System.currentTimeMillis;
3637
import static org.swisspush.redisques.util.RedisquesAPI.OK;
3738
import static org.swisspush.redisques.util.RedisquesAPI.PAYLOAD;
@@ -359,16 +360,13 @@ private void processMessageWithTimeout(final String queue, final String payload,
359360
}
360361

361362
private Future<Void> refreshRegistration(String queueName) {
362-
Promise<Void> promise = Promise.promise();
363-
vertx.eventBus().request(keyspaceHelper.getVerticleRefreshRegistrationKey(), queueName).onComplete(event -> {
364-
if (event.succeeded()) {
365-
promise.complete();
366-
return;
363+
String address = keyspaceHelper.getVerticleRefreshRegistrationKey();
364+
return vertx.eventBus().request(address, queueName).transform(ev -> {
365+
if (ev.failed()) {
366+
log.error("RedisQues refresh registration failed", ev.cause());
367367
}
368-
promise.fail(event.cause());
369-
log.error("RedisQues refresh registration failed", event.cause());
368+
return ev.succeeded() ? succeededFuture() : failedFuture(ev.cause());
370369
});
371-
return promise.future();
372370
}
373371

374372
private Future<Void> notifyConsumer(String queueName) {
@@ -422,15 +420,13 @@ void setMyQueuesState(String queueName, QueueState state) {
422420
if (null == queueProcessingState) {
423421
// not in our list yet
424422
return new QueueProcessingState(state, 0);
423+
} else if (queueProcessingState.getState() == QueueState.CONSUMING && state == QueueState.READY) {
424+
// update the state and the timestamp when we change from CONSUMING to READY
425+
return new QueueProcessingState(QueueState.READY, currentTimeMillis());
425426
} else {
426-
if (queueProcessingState.getState() == QueueState.CONSUMING && state == QueueState.READY) {
427-
// update the state and the timestamp when we change from CONSUMING to READY
428-
return new QueueProcessingState(QueueState.READY, currentTimeMillis());
429-
} else {
430-
// update the state but leave the timestamp unchanged
431-
queueProcessingState.setState(state);
432-
return queueProcessingState;
433-
}
427+
// update the state but leave the timestamp unchanged
428+
queueProcessingState.setState(state);
429+
return queueProcessingState;
434430
}
435431
});
436432
}

src/main/java/org/swisspush/redisques/queue/QueueRegistryService.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,13 @@ public void refreshRegistration(String queueName, Handler<AsyncResult<Response>>
228228
if (handler == null) {
229229
throw new RuntimeException("Handler must be set");
230230
} else {
231-
vertx.executeBlocking(promise ->
232-
redisService.expire(consumerKey, String.valueOf(consumerLockTime)).
233-
onComplete(responseAsyncResult -> {
234-
handler.handle(responseAsyncResult);
235-
promise.complete();
236-
}));
231+
vertx.executeBlocking(() -> {
232+
return redisService.expire(consumerKey, String.valueOf(consumerLockTime));
233+
}).compose((Future<Response> tooManyNestedFutures) -> {
234+
return tooManyNestedFutures;
235+
}).onComplete((AsyncResult<Response> ev) -> {
236+
handler.handle(ev);
237+
});
237238
}
238239
}
239240

src/main/java/org/swisspush/redisques/queue/RedisService.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,16 @@ public Future<Response> get(String key) {
8181
}
8282

8383
public Future<Boolean> setNxPx(String key, String value, boolean setNx, long durationMillis) {
84-
Promise<Boolean> p = Promise.promise();
85-
redis().compose((RedisAPI redisAPI) -> {
86-
Future<?> future;
87-
if (setNx) {
88-
future = redisAPI.send(Command.SET, key, value, "NX", "PX", String.valueOf(durationMillis));
89-
} else {
90-
future = redisAPI.send(Command.SET, key, value, "PX", String.valueOf(durationMillis));
91-
}
92-
return future.onSuccess(resp -> p.complete(resp != null && "OK".equals(resp.toString())))
93-
.onFailure(p::fail);
94-
}
95-
);
96-
return p.future();
84+
return redis().compose((RedisAPI redisAPI) -> {
85+
String durationMillisStr = String.valueOf(durationMillis);
86+
if (setNx) {
87+
return redisAPI.send(Command.SET, key, value, "NX", "PX", durationMillisStr);
88+
} else {
89+
return redisAPI.send(Command.SET, key, value, "PX", durationMillisStr);
90+
}
91+
}).map((Response rsp) -> {
92+
return rsp != null && "OK".equals(rsp.toString());
93+
});
9794
}
9895

9996
public Future<Response> zadd(String queuesKey, String queueName, String value) {

0 commit comments

Comments
 (0)