Skip to content

Commit e5829f1

Browse files
committed
Add binary stream support for XREAD and XREADGROUP (redis#3566)
- Created StreamEntryBinary class to support binary data with Map<byte[], byte[]> - Added xreadBinary, xreadBinaryAsMap, xreadGroupBinary, and xreadGroupBinaryAsMap methods to StreamBinaryCommands - Implemented binary stream builders in BuilderFactory - Added implementation in Jedis and UnifiedJedis classes - Created BinaryStreamEntryTest to verify binary stream functionality
1 parent 93dd26c commit e5829f1

File tree

7 files changed

+442
-4
lines changed

7 files changed

+442
-4
lines changed

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,6 +1809,124 @@ public String toString() {
18091809
}
18101810
};
18111811

1812+
public static final Builder<StreamEntryBinary> STREAM_ENTRY_BINARY = new Builder<StreamEntryBinary>() {
1813+
@Override
1814+
@SuppressWarnings("unchecked")
1815+
public StreamEntryBinary build(Object data) {
1816+
if (null == data) {
1817+
return null;
1818+
}
1819+
List<Object> list = (List<Object>) data;
1820+
String entryIdString = SafeEncoder.encode((byte[]) list.get(0));
1821+
StreamEntryID entryID = new StreamEntryID(entryIdString);
1822+
List<byte[]> hash = (List<byte[]>) list.get(1);
1823+
if (hash == null) {
1824+
return new StreamEntryBinary(entryID, null);
1825+
}
1826+
1827+
Iterator<byte[]> hashIterator = hash.iterator();
1828+
Map<byte[], byte[]> map = new JedisByteHashMap();
1829+
while (hashIterator.hasNext()) {
1830+
map.put(hashIterator.next(), hashIterator.next());
1831+
}
1832+
return new StreamEntryBinary(entryID, map);
1833+
}
1834+
1835+
@Override
1836+
public String toString() {
1837+
return "StreamEntryBinary";
1838+
}
1839+
};
1840+
1841+
public static final Builder<List<StreamEntryBinary>> STREAM_ENTRY_BINARY_LIST = new Builder<List<StreamEntryBinary>>() {
1842+
@Override
1843+
@SuppressWarnings("unchecked")
1844+
public List<StreamEntryBinary> build(Object data) {
1845+
if (null == data) {
1846+
return null;
1847+
}
1848+
List<ArrayList<Object>> objectList = (List<ArrayList<Object>>) data;
1849+
1850+
List<StreamEntryBinary> responses = new ArrayList<>(objectList.size() / 2);
1851+
if (objectList.isEmpty()) {
1852+
return responses;
1853+
}
1854+
1855+
for (ArrayList<Object> res : objectList) {
1856+
responses.add(STREAM_ENTRY_BINARY.build(res));
1857+
}
1858+
1859+
return responses;
1860+
}
1861+
1862+
@Override
1863+
public String toString() {
1864+
return "List<StreamEntryBinary>";
1865+
}
1866+
};
1867+
1868+
public static final Builder<Map<byte[], List<StreamEntryBinary>>> STREAM_READ_BINARY_MAP_RESPONSE
1869+
= new Builder<Map<byte[], List<StreamEntryBinary>>>() {
1870+
@Override
1871+
@SuppressWarnings("unchecked")
1872+
public Map<byte[], List<StreamEntryBinary>> build(Object data) {
1873+
if (data == null) return null;
1874+
List list = (List) data;
1875+
if (list.isEmpty()) return Collections.emptyMap();
1876+
1877+
if (list.get(0) instanceof KeyValue) {
1878+
return ((List<KeyValue>) list).stream()
1879+
.collect(Collectors.toMap(kv -> BINARY.build(kv.getKey()), kv -> STREAM_ENTRY_BINARY_LIST.build(kv.getValue())));
1880+
} else {
1881+
Map<byte[], List<StreamEntryBinary>> result = new HashMap<>(list.size());
1882+
for (Object anObj : list) {
1883+
List<Object> streamObj = (List<Object>) anObj;
1884+
byte[] streamKey = (byte[]) streamObj.get(0);
1885+
List<StreamEntryBinary> streamEntries = STREAM_ENTRY_BINARY_LIST.build(streamObj.get(1));
1886+
result.put(streamKey, streamEntries);
1887+
}
1888+
return result;
1889+
}
1890+
}
1891+
1892+
@Override
1893+
public String toString() {
1894+
return "Map<byte[], List<StreamEntryBinary>>";
1895+
}
1896+
};
1897+
1898+
public static final Builder<List<Map.Entry<byte[], List<StreamEntryBinary>>>> STREAM_READ_BINARY_RESPONSE
1899+
= new Builder<List<Map.Entry<byte[], List<StreamEntryBinary>>>>() {
1900+
@Override
1901+
@SuppressWarnings("unchecked")
1902+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> build(Object data) {
1903+
if (data == null) return null;
1904+
List list = (List) data;
1905+
if (list.isEmpty()) return Collections.emptyList();
1906+
1907+
if (list.get(0) instanceof KeyValue) {
1908+
return ((List<KeyValue>) list).stream()
1909+
.map(kv -> new KeyValue<>(BINARY.build(kv.getKey()),
1910+
STREAM_ENTRY_BINARY_LIST.build(kv.getValue())))
1911+
.collect(Collectors.toList());
1912+
} else {
1913+
List<Map.Entry<byte[], List<StreamEntryBinary>>> result = new ArrayList<>(list.size());
1914+
for (Object anObj : list) {
1915+
List<Object> streamObj = (List<Object>) anObj;
1916+
byte[] streamKey = BINARY.build(streamObj.get(0));
1917+
List<StreamEntryBinary> streamEntries = STREAM_ENTRY_BINARY_LIST.build(streamObj.get(1));
1918+
result.add(KeyValue.of(streamKey, streamEntries));
1919+
}
1920+
return result;
1921+
}
1922+
}
1923+
1924+
@Override
1925+
public String toString() {
1926+
return "List<Entry<byte[], List<StreamEntryBinary>>>";
1927+
}
1928+
};
1929+
18121930
private static final List<Builder> BACKUP_BUILDERS_FOR_DECODING_FUNCTIONS
18131931
= Arrays.asList(STRING, LONG, DOUBLE);
18141932

src/main/java/redis/clients/jedis/CommandObjects.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2892,6 +2892,30 @@ public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entr
28922892
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
28932893
}
28942894

2895+
public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadBinary(
2896+
XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
2897+
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
2898+
for (Map.Entry<byte[], byte[]> entry : streams) {
2899+
args.key(entry.getKey());
2900+
}
2901+
for (Map.Entry<byte[], byte[]> entry : streams) {
2902+
args.add(entry.getValue());
2903+
}
2904+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
2905+
}
2906+
2907+
public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadBinaryAsMap(
2908+
XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
2909+
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
2910+
for (Map.Entry<byte[], byte[]> entry : streams) {
2911+
args.key(entry.getKey());
2912+
}
2913+
for (Map.Entry<byte[], byte[]> entry : streams) {
2914+
args.add(entry.getValue());
2915+
}
2916+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
2917+
}
2918+
28952919
public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] consumer,
28962920
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
28972921
CommandArguments args = commandArguments(XREADGROUP)
@@ -2905,6 +2929,36 @@ public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] con
29052929
}
29062930
return new CommandObject<>(args, BuilderFactory.RAW_OBJECT_LIST);
29072931
}
2932+
2933+
public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadGroupBinary(
2934+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
2935+
Map.Entry<byte[], byte[]>... streams) {
2936+
CommandArguments args = commandArguments(XREADGROUP)
2937+
.add(GROUP).add(groupName).add(consumer)
2938+
.addParams(xReadGroupParams).add(STREAMS);
2939+
for (Map.Entry<byte[], byte[]> entry : streams) {
2940+
args.key(entry.getKey());
2941+
}
2942+
for (Map.Entry<byte[], byte[]> entry : streams) {
2943+
args.add(entry.getValue());
2944+
}
2945+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_RESPONSE);
2946+
}
2947+
2948+
public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinaryAsMap(
2949+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
2950+
Map.Entry<byte[], byte[]>... streams) {
2951+
CommandArguments args = commandArguments(XREADGROUP)
2952+
.add(GROUP).add(groupName).add(consumer)
2953+
.addParams(xReadGroupParams).add(STREAMS);
2954+
for (Map.Entry<byte[], byte[]> entry : streams) {
2955+
args.key(entry.getKey());
2956+
}
2957+
for (Map.Entry<byte[], byte[]> entry : streams) {
2958+
args.add(entry.getValue());
2959+
}
2960+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_BINARY_MAP_RESPONSE);
2961+
}
29082962
// Stream commands
29092963

29102964
// Scripting commands

src/main/java/redis/clients/jedis/Jedis.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,7 @@ public long hsetex(byte[] key, HSetExParams params, byte[] field, byte[] value)
11721172
checkIsInMultiOrPipeline();
11731173
return connection.executeCommand(commandObjects.hsetex(key, params, field, value));
11741174
}
1175-
1175+
11761176
@Override
11771177
public long hsetex(byte[] key, HSetExParams params, Map<byte[], byte[]> hash){
11781178
checkIsInMultiOrPipeline();
@@ -1200,13 +1200,13 @@ public List<byte[]> hgetex(byte[] key, HGetExParams params, byte[]... fields){
12001200
checkIsInMultiOrPipeline();
12011201
return connection.executeCommand(commandObjects.hgetex(key, params, fields));
12021202
}
1203-
1203+
12041204
@Override
12051205
public List<byte[]> hgetdel(byte[] key, byte[]... fields){
12061206
checkIsInMultiOrPipeline();
12071207
return connection.executeCommand(commandObjects.hgetdel(key, fields));
12081208
}
1209-
1209+
12101210
/**
12111211
* Set the specified hash field to the specified value if the field not exists. <b>Time
12121212
* complexity:</b> O(1)
@@ -4783,6 +4783,34 @@ public List<Object> xreadGroup(byte[] groupName, byte[] consumer,
47834783
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
47844784
}
47854785

4786+
@Override
4787+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadBinary(XReadParams xReadParams,
4788+
Entry<byte[], byte[]>... streams) {
4789+
checkIsInMultiOrPipeline();
4790+
return connection.executeCommand(commandObjects.xreadBinary(xReadParams, streams));
4791+
}
4792+
4793+
@Override
4794+
public Map<byte[], List<StreamEntryBinary>> xreadBinaryAsMap(XReadParams xReadParams,
4795+
Entry<byte[], byte[]>... streams) {
4796+
checkIsInMultiOrPipeline();
4797+
return connection.executeCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams));
4798+
}
4799+
4800+
@Override
4801+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadGroupBinary(byte[] groupName, byte[] consumer,
4802+
XReadGroupParams xReadGroupParams, Entry<byte[], byte[]>... streams) {
4803+
checkIsInMultiOrPipeline();
4804+
return connection.executeCommand(commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams));
4805+
}
4806+
4807+
@Override
4808+
public Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer,
4809+
XReadGroupParams xReadGroupParams, Entry<byte[], byte[]>... streams) {
4810+
checkIsInMultiOrPipeline();
4811+
return connection.executeCommand(commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
4812+
}
4813+
47864814
@Override
47874815
public byte[] xadd(final byte[] key, final XAddParams params, final Map<byte[], byte[]> hash) {
47884816
checkIsInMultiOrPipeline();

src/main/java/redis/clients/jedis/UnifiedJedis.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1548,7 +1548,7 @@ public byte[] hget(byte[] key, byte[] field) {
15481548
public List<byte[]> hgetex(byte[] key, HGetExParams params, byte[]... fields) {
15491549
return executeCommand(commandObjects.hgetex(key, params, fields));
15501550
}
1551-
1551+
15521552
@Override
15531553
public List<byte[]> hgetdel(byte[] key, byte[]... fields) {
15541554
return executeCommand(commandObjects.hgetdel(key, fields));
@@ -3460,6 +3460,30 @@ public List<Object> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>...
34603460
public List<Object> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
34613461
return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
34623462
}
3463+
3464+
@Override
3465+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadBinary(XReadParams xReadParams,
3466+
Map.Entry<byte[], byte[]>... streams) {
3467+
return executeCommand(commandObjects.xreadBinary(xReadParams, streams));
3468+
}
3469+
3470+
@Override
3471+
public Map<byte[], List<StreamEntryBinary>> xreadBinaryAsMap(XReadParams xReadParams,
3472+
Map.Entry<byte[], byte[]>... streams) {
3473+
return executeCommand(commandObjects.xreadBinaryAsMap(xReadParams, streams));
3474+
}
3475+
3476+
@Override
3477+
public List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadGroupBinary(byte[] groupName, byte[] consumer,
3478+
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
3479+
return executeCommand(commandObjects.xreadGroupBinary(groupName, consumer, xReadGroupParams, streams));
3480+
}
3481+
3482+
@Override
3483+
public Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer,
3484+
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams) {
3485+
return executeCommand(commandObjects.xreadGroupBinaryAsMap(groupName, consumer, xReadGroupParams, streams));
3486+
}
34633487
// Stream commands
34643488

34653489
// Scripting commands

src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.Map;
55

66
import redis.clients.jedis.params.*;
7+
import redis.clients.jedis.resps.StreamEntryBinary;
78

89
public interface StreamBinaryCommands {
910

@@ -79,4 +80,15 @@ List<Object> xautoclaimJustId(byte[] key, byte[] groupName, byte[] consumerName,
7980
List<Object> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
8081
Map.Entry<byte[], byte[]>... streams);
8182

83+
List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadBinary(XReadParams xReadParams,
84+
Map.Entry<byte[], byte[]>... streams);
85+
86+
Map<byte[], List<StreamEntryBinary>> xreadBinaryAsMap(XReadParams xReadParams,
87+
Map.Entry<byte[], byte[]>... streams);
88+
89+
List<Map.Entry<byte[], List<StreamEntryBinary>>> xreadGroupBinary(byte[] groupName, byte[] consumer,
90+
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams);
91+
92+
Map<byte[], List<StreamEntryBinary>> xreadGroupBinaryAsMap(byte[] groupName, byte[] consumer,
93+
XReadGroupParams xReadGroupParams, Map.Entry<byte[], byte[]>... streams);
8294
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package redis.clients.jedis.resps;
2+
3+
import java.io.IOException;
4+
import java.io.Serializable;
5+
import java.util.Map;
6+
import redis.clients.jedis.StreamEntryID;
7+
8+
public class StreamEntryBinary implements Serializable {
9+
10+
private static final long serialVersionUID = 1L;
11+
12+
private StreamEntryID id;
13+
private Map<byte[], byte[]> fields;
14+
15+
public StreamEntryBinary(StreamEntryID id, Map<byte[], byte[]> fields) {
16+
this.id = id;
17+
this.fields = fields;
18+
}
19+
20+
public StreamEntryID getID() {
21+
return id;
22+
}
23+
24+
public Map<byte[], byte[]> getFields() {
25+
return fields;
26+
}
27+
28+
@Override
29+
public String toString() {
30+
return id + " " + fields;
31+
}
32+
33+
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
34+
out.writeUnshared(this.id);
35+
out.writeUnshared(this.fields);
36+
}
37+
38+
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
39+
this.id = (StreamEntryID) in.readUnshared();
40+
this.fields = (Map<byte[], byte[]>) in.readUnshared();
41+
}
42+
}

0 commit comments

Comments
 (0)