Skip to content

Commit 3a4ffcb

Browse files
committed
feat(impv): Support Jedis
1 parent b0b6ae8 commit 3a4ffcb

File tree

3 files changed

+39
-0
lines changed

3 files changed

+39
-0
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java

+5
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848

4949
/**
5050
* @author Dengliming
51+
* @author Jeonggyu Choi
5152
* @since 2.3
5253
*/
5354
class JedisClusterStreamCommands implements RedisStreamCommands {
@@ -283,6 +284,10 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op
283284
pendingParams = pendingParams.consumer(consumerName);
284285
}
285286

287+
if (options.hasIdle()) {
288+
pendingParams = pendingParams.idle(options.getIdleMillis());
289+
}
290+
286291
List<Object> response = connection.getCluster().xpending(key, group, pendingParams);
287292

288293
return StreamConverters.toPendingMessages(groupName, range,

src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java

+4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
*
5656
* @author dengliming
5757
* @author Mark Paluch
58+
* @author Jeonggyu Choi
5859
* @since 2.3
5960
*/
6061
class StreamConverters {
@@ -306,6 +307,9 @@ public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOption
306307
if (options.hasConsumer()) {
307308
xPendingParams.consumer(options.getConsumerName());
308309
}
310+
if (options.hasIdle()) {
311+
xPendingParams.idle(options.getIdleMillis());
312+
}
309313

310314
return xPendingParams;
311315
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.springframework.data.redis.connection.jedis;
2+
3+
import static org.assertj.core.api.Assertions.*;
4+
5+
import redis.clients.jedis.params.XPendingParams;
6+
7+
import java.lang.reflect.Field;
8+
import java.time.Duration;
9+
import java.time.temporal.ChronoUnit;
10+
11+
import org.junit.jupiter.api.Test;
12+
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
13+
14+
/**
15+
* @author Jeonggyu Choi
16+
*/
17+
class StreamConvertersUnitTest {
18+
19+
@Test // GH-2046
20+
void shouldConvertIdle() throws NoSuchFieldException, IllegalAccessException {
21+
XPendingOptions options = XPendingOptions.unbounded(5L).idle(Duration.of(1, ChronoUnit.HOURS));
22+
23+
XPendingParams xPendingParams = StreamConverters.toXPendingParams(options);
24+
25+
Field idle = XPendingParams.class.getDeclaredField("idle");
26+
idle.setAccessible(true);
27+
Long idleValue = (Long) idle.get(xPendingParams);
28+
assertThat(idleValue).isEqualTo(Duration.of(1, ChronoUnit.HOURS).toMillis());
29+
}
30+
}

0 commit comments

Comments
 (0)