Skip to content

Commit 24c5510

Browse files
committed
Add Map<byte[], StreamEntryID> support and binary stream tests (redis#3566)
- Introduce a Map<byte[], StreamEntryID> parameter for binary stream commands - Duplicate all existing test cases for xread, xreadAsMap, xreadGroup and xreadGroupAsMap from StreamsCommandsTest.java and replace them with xreadBinary, xreadBinaryAsMap, xreadGroupBinary and xreadGroupBinaryAsMap in StreamsBinaryCommandsTest
1 parent 88a8ff4 commit 24c5510

File tree

6 files changed

+854
-1
lines changed

6 files changed

+854
-1
lines changed

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1859,8 +1859,20 @@ public List<StreamEntryBinary> build(Object data) {
18591859
responses.add(null);
18601860
continue;
18611861
}
1862+
String entryIdString = SafeEncoder.encode((byte[]) res.get(0));
1863+
StreamEntryID entryID = new StreamEntryID(entryIdString);
1864+
List<byte[]> hash = (List<byte[]>) res.get(1);
1865+
if (hash == null) {
1866+
responses.add(new StreamEntryBinary(entryID, null));
1867+
continue;
1868+
}
18621869

1863-
responses.add(STREAM_ENTRY_BINARY.build(res));
1870+
Iterator<byte[]> hashIterator = hash.iterator();
1871+
Map<byte[], byte[]> map = new HashMap<>(hash.size() / 2, 1f);
1872+
while (hashIterator.hasNext()) {
1873+
map.put(hashIterator.next(), hashIterator.next());
1874+
}
1875+
responses.add(new StreamEntryBinary(entryID, map));
18641876
}
18651877

18661878
return responses;

src/main/java/redis/clients/jedis/CommandObjects.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2959,6 +2959,48 @@ public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinar
29592959
}
29602960
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
29612961
}
2962+
2963+
public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadBinary(
2964+
XReadParams xReadParams, Map<byte[], StreamEntryID> streams) {
2965+
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
2966+
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
2967+
entrySet.forEach(entry -> args.key(entry.getKey()));
2968+
entrySet.forEach(entry -> args.add(entry.getValue()));
2969+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
2970+
}
2971+
2972+
public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadBinaryAsMap(
2973+
XReadParams xReadParams, Map<byte[], StreamEntryID> streams) {
2974+
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
2975+
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
2976+
entrySet.forEach(entry -> args.key(entry.getKey()));
2977+
entrySet.forEach(entry -> args.add(entry.getValue()));
2978+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
2979+
}
2980+
2981+
public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadGroupBinary(
2982+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
2983+
Map<byte[], StreamEntryID> streams) {
2984+
CommandArguments args = commandArguments(XREADGROUP)
2985+
.add(GROUP).add(groupName).add(consumer)
2986+
.addParams(xReadGroupParams).add(STREAMS);
2987+
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
2988+
entrySet.forEach(entry -> args.key(entry.getKey()));
2989+
entrySet.forEach(entry -> args.add(entry.getValue()));
2990+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
2991+
}
2992+
2993+
public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinaryAsMap(
2994+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
2995+
Map<byte[], StreamEntryID> streams) {
2996+
CommandArguments args = commandArguments(XREADGROUP)
2997+
.add(GROUP).add(groupName).add(consumer)
2998+
.addParams(xReadGroupParams).add(STREAMS);
2999+
Set<Map.Entry<byte[], StreamEntryID>> entrySet = streams.entrySet();
3000+
entrySet.forEach(entry -> args.key(entry.getKey()));
3001+
entrySet.forEach(entry -> args.add(entry.getValue()));
3002+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
3003+
}
29623004
// Stream commands
29633005

29643006
// Scripting commands

src/main/java/redis/clients/jedis/Jedis.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4811,6 +4811,34 @@ public Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupNa
48114811
return connection.executeCommand(commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
48124812
}
48134813

4814+
@Override
4815+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadBinary(XReadParams xReadParams,
4816+
Map<byte[], StreamEntryID> streams) {
4817+
checkIsInMultiOrPipeline();
4818+
return connection.executeCommand(commandObjects.xreadBinary(xReadParams, streams));
4819+
}
4820+
4821+
@Override
4822+
public Map<byte[], List<StreamEntryBinary>> xreadBinaryAsMap(XReadParams xReadParams,
4823+
Map<byte[], StreamEntryID> streams) {
4824+
checkIsInMultiOrPipeline();
4825+
return connection.executeCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams));
4826+
}
4827+
4828+
@Override
4829+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadGroupBinary(byte[] groupName, byte[] consumer,
4830+
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
4831+
checkIsInMultiOrPipeline();
4832+
return connection.executeCommand(commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams));
4833+
}
4834+
4835+
@Override
4836+
public Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer,
4837+
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
4838+
checkIsInMultiOrPipeline();
4839+
return connection.executeCommand(commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
4840+
}
4841+
48144842
@Override
48154843
public byte[] xadd(final byte[] key, final XAddParams params, final Map<byte[], byte[]> hash) {
48164844
checkIsInMultiOrPipeline();

src/main/java/redis/clients/jedis/UnifiedJedis.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3484,6 +3484,30 @@ public Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupNa
34843484
XReadGroupParams xReadGroupParams, Map.Entry<byte[], StreamEntryID>... streams) {
34853485
return executeCommand(commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
34863486
}
3487+
3488+
@Override
3489+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadBinary(XReadParams xReadParams,
3490+
Map<byte[], StreamEntryID> streams) {
3491+
return executeCommand(commandObjects.xreadBinary(xReadParams, streams));
3492+
}
3493+
3494+
@Override
3495+
public Map<byte[], List<StreamEntryBinary>> xreadBinaryAsMap(XReadParams xReadParams,
3496+
Map<byte[], StreamEntryID> streams) {
3497+
return executeCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams));
3498+
}
3499+
3500+
@Override
3501+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadGroupBinary(byte[] groupName, byte[] consumer,
3502+
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
3503+
return executeCommand(commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams));
3504+
}
3505+
3506+
@Override
3507+
public Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer,
3508+
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams) {
3509+
return executeCommand(commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
3510+
}
34873511
// Stream commands
34883512

34893513
// Scripting commands

src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,17 @@ List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadGroupBinary(byte[] groupNa
9292

9393
Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer,
9494
XReadGroupParams xReadGroupParams, Map.Entry<byte[], StreamEntryID>... streams);
95+
96+
List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadBinary(XReadParams xReadParams,
97+
Map<byte[], StreamEntryID> streams);
98+
99+
Map<byte[], List<StreamEntryBinary>> xreadBinaryAsMap(XReadParams xReadParams,
100+
Map<byte[], StreamEntryID> streams);
101+
102+
List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadGroupBinary(byte[] groupName, byte[] consumer,
103+
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams);
104+
105+
Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer,
106+
XReadGroupParams xReadGroupParams, Map<byte[], StreamEntryID> streams);
107+
95108
}

0 commit comments

Comments
 (0)