Skip to content

Commit f065b38

Browse files
authored
Merge pull request #215 from swisspost/develop
PR for release
2 parents cce77b0 + 36ff2e7 commit f065b38

File tree

7 files changed

+150
-22
lines changed

7 files changed

+150
-22
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<modelVersion>4.0.0</modelVersion>
33
<groupId>org.swisspush</groupId>
44
<artifactId>redisques</artifactId>
5-
<version>4.1.2-SNAPSHOT</version>
5+
<version>4.1.3-SNAPSHOT</version>
66
<name>redisques</name>
77
<description>
88
A highly scalable redis-persistent queuing system for vertx

src/main/java/org/swisspush/redisques/QueueStatsService.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,6 @@ private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<T
187187
}
188188

189189
private <CTX> void attachDequeueStats(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
190-
// Setup a lookup table as we need to find by name further below.
191-
Map<String, JsonObject> detailsByName = new HashMap<>(req.queuesJsonArr.size());
192-
for (var it = (Iterator<JsonObject>) (Object) req.queuesJsonArr.iterator(); it.hasNext(); ) {
193-
JsonObject detailJson = it.next();
194-
String name = detailJson.getString(MONITOR_QUEUE_NAME);
195-
detailsByName.put(name, detailJson);
196-
}
197-
198190
dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> {
199191
for (Queue queue : req.queues) {
200192
if (event.containsKey(queue.name)) {
@@ -221,6 +213,7 @@ private static class GetQueueStatsRequest<CTX> {
221213
private CTX mCtx;
222214
private GetQueueStatsMentor<CTX> mentor;
223215
private List<String> queueNames;
216+
/* TODO: Why is 'queuesJsonArr' never accessed? Isn't this the reason of our class in the first place? */
224217
private JsonArray queuesJsonArr;
225218
private List<Queue> queues;
226219
}

src/main/java/org/swisspush/redisques/RedisQues.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,9 @@ private enum QueueState {
230230
private final Semaphore getQueuesItemsCountRedisRequestQuota;
231231

232232
public RedisQues() {
233-
this.exceptionFactory = newThriftyExceptionFactory();
233+
this(null, null, null, newThriftyExceptionFactory(), new Semaphore(Integer.MAX_VALUE),
234+
new Semaphore(Integer.MAX_VALUE), new Semaphore(Integer.MAX_VALUE), new Semaphore(Integer.MAX_VALUE));
234235
log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE);
235-
this.redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE);
236-
this.checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE);
237-
this.queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE);
238-
this.getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE);
239236
}
240237

241238
public RedisQues(
@@ -314,10 +311,6 @@ public void start(Promise<Void> promise) {
314311
this.configurationProvider = new DefaultRedisquesConfigurationProvider(vertx, config());
315312
}
316313

317-
if (this.dequeueStatisticCollector == null) {
318-
this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx);
319-
}
320-
321314
if (this.periodicSkipScheduler == null) {
322315
this.periodicSkipScheduler = new PeriodicSkipScheduler(vertx);
323316
}
@@ -326,11 +319,14 @@ public void start(Promise<Void> promise) {
326319
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());
327320

328321
int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec();
329-
if (dequeueStatisticReportIntervalSec > 0) {
322+
if (modConfig.isDequeueStatsEnabled()) {
330323
dequeueStatisticEnabled = true;
331324
Runnable publisher = newDequeueStatisticPublisher();
332325
vertx.setPeriodic(1000L * dequeueStatisticReportIntervalSec, time -> publisher.run());
333326
}
327+
if (this.dequeueStatisticCollector == null) {
328+
this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx,dequeueStatisticEnabled);
329+
}
334330
queuesKey = modConfig.getRedisPrefix() + "queues";
335331
queuesPrefix = modConfig.getRedisPrefix() + "queues:";
336332
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";

src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,10 @@ public void handle(AsyncResult<Response> handleQueues) {
134134
var obj = new JsonObject().put(STATUS, OK).put(QUEUES, result);
135135
long jsonCreateDurationMs = currentTimeMillis() - beginEpchMs;
136136
if (jsonCreateDurationMs > 10) {
137-
log.info("Creating JSON with {} entries did block this tread for {}ms",
137+
log.info("Creating JSON with {} entries did block this thread for {}ms",
138138
ctx.queueLengths.length, jsonCreateDurationMs);
139139
}else{
140-
log.debug("Creating JSON with {} entries did block this tread for {}ms",
140+
log.debug("Creating JSON with {} entries did block this thread for {}ms",
141141
ctx.queueLengths.length, jsonCreateDurationMs);
142142
}
143143
workerPromise.complete(obj);

src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,19 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14+
import java.util.Collections;
1415
import java.util.Map;
1516

1617
public class DequeueStatisticCollector {
1718
private static final Logger log = LoggerFactory.getLogger(DequeueStatisticCollector.class);
1819
private final static String DEQUEUE_STATISTIC_DATA = "dequeueStatisticData";
1920
private final static String DEQUEUE_STATISTIC_LOCK_PREFIX = "dequeueStatisticLock.";
2021
private final SharedData sharedData;
22+
private final boolean dequeueStatisticEnabled;
2123

22-
public DequeueStatisticCollector(Vertx vertx) {
24+
public DequeueStatisticCollector(Vertx vertx, boolean dequeueStatisticEnabled) {
2325
this.sharedData = vertx.sharedData();
26+
this.dequeueStatisticEnabled = dequeueStatisticEnabled;
2427
}
2528

2629
/**
@@ -107,6 +110,10 @@ public Future<Void> setDequeueStatistic(final String queueName, final DequeueSta
107110
}
108111

109112
public Future<Map<String, DequeueStatistic>> getAllDequeueStatistics() {
113+
// Check if dequeue statistics are enabled
114+
if (!dequeueStatisticEnabled) {
115+
return Future.succeededFuture(Collections.emptyMap()); // Return an empty map to avoid NullPointerExceptions
116+
}
110117
Promise<Map<String, DequeueStatistic>> promise = Promise.promise();
111118
sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler<AsyncResult<AsyncMap<String, DequeueStatistic>>>) asyncResult -> {
112119
if (asyncResult.failed()) {

src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,10 @@ public String toString() {
704704
return asJsonObject().toString();
705705
}
706706

707+
public boolean isDequeueStatsEnabled() {
708+
return getDequeueStatisticReportIntervalSec() > 0;
709+
}
710+
707711
/**
708712
* RedisquesConfigurationBuilder class for simplified configuration.
709713
*
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package org.swisspush.redisques.util;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.core.Vertx;
5+
import io.vertx.core.shareddata.AsyncMap;
6+
import io.vertx.core.shareddata.SharedData;
7+
import io.vertx.ext.unit.Async;
8+
import io.vertx.ext.unit.TestContext;
9+
import io.vertx.ext.unit.junit.VertxUnitRunner;
10+
import org.junit.Before;
11+
import org.junit.Test;
12+
import org.junit.runner.RunWith;
13+
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
import static org.mockito.Mockito.*;
18+
19+
@RunWith(VertxUnitRunner.class)
20+
public class DequeueStatisticCollectorTest {
21+
22+
private Vertx vertx;
23+
private SharedData sharedData;
24+
private AsyncMap<String, DequeueStatistic> asyncMap;
25+
private DequeueStatisticCollector dequeueStatisticCollectorEnabled;
26+
private DequeueStatisticCollector dequeueStatisticCollectorDisabled;
27+
28+
@Before
29+
public void setUp() {
30+
vertx = mock(Vertx.class);
31+
sharedData = mock(SharedData.class);
32+
asyncMap = mock(AsyncMap.class);
33+
34+
// Mock sharedData.getAsyncMap to return asyncMap
35+
doAnswer(invocation -> {
36+
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
37+
handler.handle(Future.succeededFuture(asyncMap));
38+
return null;
39+
}).when(sharedData).getAsyncMap(anyString(), any());
40+
41+
when(vertx.sharedData()).thenReturn(sharedData);
42+
43+
// Initialize DequeueStatisticCollector with enabled/disabled stats collection
44+
dequeueStatisticCollectorEnabled = new DequeueStatisticCollector(vertx, true);
45+
dequeueStatisticCollectorDisabled = new DequeueStatisticCollector(vertx, false);
46+
}
47+
48+
@Test
49+
public void testGetAllDequeueStatisticsEnabled(TestContext context) {
50+
// Mock asyncMap.entries() to return a non-empty map
51+
Map<String, DequeueStatistic> dequeueStats = new HashMap<>();
52+
dequeueStats.put("queue1", new DequeueStatistic());
53+
when(asyncMap.entries()).thenReturn(Future.succeededFuture(dequeueStats));
54+
55+
// Test when dequeue statistics are enabled
56+
Async async = context.async();
57+
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
58+
context.assertTrue(result.succeeded());
59+
context.assertEquals(1, result.result().size());
60+
async.complete();
61+
});
62+
63+
// Verify that sharedData and asyncMap were used correctly
64+
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
65+
verify(asyncMap, times(1)).entries();
66+
}
67+
68+
@Test
69+
public void testGetAllDequeueStatisticsDisabled(TestContext context) {
70+
// Test when dequeue statistics are disabled
71+
Async async = context.async();
72+
dequeueStatisticCollectorDisabled.getAllDequeueStatistics().onComplete(result -> {
73+
context.assertTrue(result.succeeded());
74+
context.assertTrue(result.result().isEmpty());
75+
async.complete();
76+
});
77+
78+
// Verify that sharedData and asyncMap were NOT used
79+
verifyNoInteractions(sharedData);
80+
verifyNoInteractions(asyncMap);
81+
}
82+
83+
@Test
84+
public void testGetAllDequeueStatisticsAsyncMapFailure(TestContext context) {
85+
// Simulate failure in sharedData.getAsyncMap
86+
doAnswer(invocation -> {
87+
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
88+
handler.handle(Future.failedFuture(new RuntimeException("Failed to retrieve async map")));
89+
return null;
90+
}).when(sharedData).getAsyncMap(anyString(), any());
91+
92+
// Test when asyncMap retrieval fails
93+
Async async = context.async();
94+
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
95+
context.assertTrue(result.failed());
96+
context.assertEquals("Failed to retrieve async map", result.cause().getMessage());
97+
async.complete();
98+
});
99+
100+
// Verify that sharedData.getAsyncMap was used, but asyncMap.entries() was not
101+
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
102+
verifyNoInteractions(asyncMap);
103+
}
104+
105+
@Test
106+
public void testGetAllDequeueStatisticsEntriesFailure(TestContext context) {
107+
// Simulate success in sharedData.getAsyncMap, but failure in asyncMap.entries
108+
doAnswer(invocation -> {
109+
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
110+
handler.handle(Future.succeededFuture(asyncMap));
111+
return null;
112+
}).when(sharedData).getAsyncMap(anyString(), any());
113+
114+
when(asyncMap.entries()).thenReturn(Future.failedFuture(new RuntimeException("Failed to retrieve entries")));
115+
116+
// Test when asyncMap.entries fails
117+
Async async = context.async();
118+
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
119+
context.assertTrue(result.failed());
120+
context.assertEquals("Failed to retrieve entries", result.cause().getMessage());
121+
async.complete();
122+
});
123+
124+
// Verify that sharedData.getAsyncMap and asyncMap.entries were used correctly
125+
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
126+
verify(asyncMap, times(1)).entries();
127+
}
128+
}

0 commit comments

Comments
 (0)