Skip to content

Commit c9812a6

Browse files
committed
Fix compatibility with JedisCluster
1 parent 01fe4d7 commit c9812a6

File tree

2 files changed

+272
-2
lines changed

2 files changed

+272
-2
lines changed

src/main/java/redis/clients/jedis/search/aggr/AggregateIterator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private AggregationResult doFetch() {
159159
return null;
160160
}
161161

162-
redis.clients.jedis.CommandArguments args = new redis.clients.jedis.CommandArguments(SearchProtocol.SearchCommand.CURSOR)
162+
redis.clients.jedis.CommandArguments args = new redis.clients.jedis.ClusterCommandArguments(SearchProtocol.SearchCommand.CURSOR)
163163
.add(SearchProtocol.SearchKeyword.READ)
164164
.add(indexName)
165165
.add(cursorId);
@@ -178,7 +178,7 @@ private AggregationResult doFetch() {
178178

179179
private Connection acquireConnection(AggregationBuilder aggregationBuilder) {
180180
// Create the initial FT.AGGREGATE command
181-
redis.clients.jedis.CommandArguments args = new redis.clients.jedis.CommandArguments(SearchProtocol.SearchCommand.AGGREGATE)
181+
redis.clients.jedis.CommandArguments args = new redis.clients.jedis.ClusterCommandArguments(SearchProtocol.SearchCommand.AGGREGATE)
182182
.add(indexName)
183183
.addParams(aggregationBuilder);
184184

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
package redis.clients.jedis.modules.search.cluster;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertNotNull;
6+
import static org.junit.jupiter.api.Assertions.assertThrows;
7+
import static org.junit.jupiter.api.Assertions.assertTrue;
8+
9+
import java.util.Collections;
10+
import java.util.LinkedHashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.NoSuchElementException;
14+
15+
import org.junit.jupiter.api.AfterEach;
16+
import org.junit.jupiter.api.BeforeEach;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.extension.RegisterExtension;
19+
20+
import io.redis.test.annotations.SinceRedisVersion;
21+
import redis.clients.jedis.DefaultJedisClientConfig;
22+
import redis.clients.jedis.HostAndPort;
23+
import redis.clients.jedis.HostAndPorts;
24+
import redis.clients.jedis.JedisCluster;
25+
import redis.clients.jedis.util.RedisVersionCondition;
26+
27+
import redis.clients.jedis.search.Document;
28+
import redis.clients.jedis.search.IndexOptions;
29+
import redis.clients.jedis.search.Schema;
30+
import redis.clients.jedis.search.aggr.*;
31+
32+
/**
33+
* This test verifies that ftAggregateIterator works correctly in cluster mode
34+
*/
35+
@SinceRedisVersion(value = "8.0.0", message = "Cluster aggregate iterator tests require Redis OSS 8.0 or higher")
36+
public class ClusterAggregateIteratorTest {
37+
38+
private static final String index = "cluster_aggiteratorindex";
39+
40+
// Use stable cluster endpoints that don't change during tests
41+
private static final List<HostAndPort> clusterNodes = HostAndPorts.getStableClusterServers();
42+
43+
private JedisCluster cluster;
44+
45+
@RegisterExtension
46+
public RedisVersionCondition versionCondition = new RedisVersionCondition(
47+
clusterNodes.get(0),
48+
DefaultJedisClientConfig.builder().password("cluster").build()
49+
);
50+
51+
@BeforeEach
52+
public void setUp() {
53+
cluster = new JedisCluster(
54+
Collections.singleton(clusterNodes.get(0)),
55+
DefaultJedisClientConfig.builder().password("cluster").build()
56+
);
57+
58+
// Clean up any existing index
59+
try {
60+
cluster.ftDropIndex(index);
61+
} catch (Exception e) {
62+
// Index might not exist, ignore
63+
}
64+
65+
// Flush all data
66+
cluster.flushAll();
67+
}
68+
69+
@AfterEach
70+
public void tearDown() throws Exception {
71+
// Clean up index
72+
try {
73+
cluster.ftDropIndex(index);
74+
} catch (Exception e) {
75+
// Index might not exist, ignore
76+
}
77+
78+
if (cluster != null) {
79+
cluster.close();
80+
}
81+
}
82+
83+
private void addDocument(Document doc) {
84+
String key = doc.getId();
85+
Map<String, String> map = new LinkedHashMap<>();
86+
doc.getProperties().forEach(entry -> map.put(entry.getKey(), String.valueOf(entry.getValue())));
87+
cluster.hset(key, map);
88+
}
89+
90+
@Test
91+
public void testAggregateIteratorFirstBatchReturnsInitialResults() {
92+
// Create index and add test data
93+
Schema sc = new Schema();
94+
sc.addSortableTextField("name", 1.0);
95+
sc.addSortableNumericField("count");
96+
cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);
97+
98+
addDocument(new Document("data1").set("name", "abc").set("count", 10));
99+
addDocument(new Document("data2").set("name", "def").set("count", 5));
100+
addDocument(new Document("data3").set("name", "def").set("count", 25));
101+
102+
// Create aggregation with cursor that should return 2 results in first batch
103+
AggregationBuilder aggr = new AggregationBuilder()
104+
.groupBy("@name", Reducers.sum("@count").as("sum"))
105+
.sortBy(10, SortedField.desc("@sum"))
106+
.cursor(2, 10000); // 2 results per batch
107+
108+
// Test that first next() call returns the initial FT.AGGREGATE results
109+
try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
110+
assertTrue(iterator.hasNext());
111+
112+
// First call should return initial results from FT.AGGREGATE
113+
AggregationResult firstBatch = iterator.next();
114+
assertNotNull(firstBatch);
115+
assertNotNull(firstBatch.getRows());
116+
assertEquals(2, firstBatch.getRows().size()); // Should have 2 groups (abc, def)
117+
118+
// Verify the results are correct
119+
List<Row> rows = firstBatch.getRows();
120+
assertEquals("def", rows.get(0).getString("name"));
121+
assertEquals(30, rows.get(0).getLong("sum"));
122+
assertEquals("abc", rows.get(1).getString("name"));
123+
assertEquals(10, rows.get(1).getLong("sum"));
124+
125+
// Should be no more batches since we got all results in first batch
126+
AggregationResult secondBatch = iterator.next();
127+
assertTrue(secondBatch.isEmpty());
128+
}
129+
}
130+
131+
@Test
132+
public void testAggregateIteratorBasicUsage() {
133+
// Create index and add test data
134+
Schema sc = new Schema();
135+
sc.addSortableTextField("name", 1.0);
136+
sc.addSortableNumericField("count");
137+
cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);
138+
139+
addDocument(new Document("data1").set("name", "abc").set("count", 10));
140+
addDocument(new Document("data2").set("name", "def").set("count", 5));
141+
addDocument(new Document("data3").set("name", "def").set("count", 25));
142+
addDocument(new Document("data4").set("name", "ghi").set("count", 15));
143+
addDocument(new Document("data5").set("name", "jkl").set("count", 20));
144+
145+
// Create aggregation with cursor to test FT.CURSOR routing in cluster mode
146+
AggregationBuilder aggr = new AggregationBuilder()
147+
.groupBy("@name", Reducers.sum("@count").as("sum"))
148+
.sortBy(10, SortedField.desc("@sum"))
149+
.cursor(2, 10000); // 2 results per batch
150+
151+
// Test the iterator using the integrated method
152+
try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
153+
assertTrue(iterator.hasNext());
154+
assertNotNull(iterator.getCursorId());
155+
156+
int totalBatches = 0;
157+
int totalRows = 0;
158+
159+
while (iterator.hasNext()) {
160+
AggregationResult batch = iterator.next();
161+
assertNotNull(batch);
162+
assertNotNull(batch.getRows());
163+
assertTrue(batch.getRows().size() <= 2); // Batch size should not exceed cursor count
164+
totalBatches++;
165+
totalRows += batch.getRows().size();
166+
}
167+
168+
assertTrue(totalBatches > 0);
169+
assertEquals(4, totalRows); // Should have 4 groups total (abc, def, ghi, jkl)
170+
assertFalse(iterator.hasNext());
171+
}
172+
}
173+
174+
@Test
175+
public void testAggregateIteratorSingleBatch() {
176+
// Create index and add test data
177+
Schema sc = new Schema();
178+
sc.addSortableTextField("name", 1.0);
179+
sc.addSortableNumericField("count");
180+
cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);
181+
182+
addDocument(new Document("data1").set("name", "abc").set("count", 10));
183+
addDocument(new Document("data2").set("name", "def").set("count", 5));
184+
185+
// Create aggregation with large cursor count (all results in one batch)
186+
AggregationBuilder aggr = new AggregationBuilder()
187+
.groupBy("@name", Reducers.sum("@count").as("sum"))
188+
.sortBy(10, SortedField.desc("@sum"))
189+
.cursor(100, 10000); // Large batch size
190+
191+
// Test the iterator using the integrated method
192+
try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
193+
assertTrue(iterator.hasNext());
194+
195+
AggregationResult batch = iterator.next();
196+
assertNotNull(batch);
197+
assertNotNull(batch.getRows());
198+
assertEquals(2, batch.getRows().size()); // Should have 2 groups (abc, def)
199+
200+
// Should be no more batches
201+
assertFalse(iterator.hasNext());
202+
}
203+
}
204+
205+
@Test
206+
public void testAggregateIteratorEmptyResult() {
207+
// Create index but add no data
208+
Schema sc = new Schema();
209+
sc.addSortableTextField("name", 1.0);
210+
sc.addSortableNumericField("count");
211+
cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);
212+
213+
// Create aggregation with cursor
214+
AggregationBuilder aggr = new AggregationBuilder()
215+
.groupBy("@name", Reducers.sum("@count").as("sum"))
216+
.cursor(10, 10000);
217+
218+
// Test the iterator with empty results using the integrated method
219+
try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
220+
// Should have no results
221+
assertTrue(iterator.next().isEmpty());
222+
}
223+
}
224+
225+
@Test
226+
public void testAggregateIteratorRemove() {
227+
// Create index and add test data
228+
Schema sc = new Schema();
229+
sc.addSortableTextField("name", 1.0);
230+
sc.addSortableNumericField("count");
231+
cluster.ftCreate(index, IndexOptions.defaultOptions(), sc);
232+
233+
addDocument(new Document("data1").set("name", "abc").set("count", 10));
234+
addDocument(new Document("data2").set("name", "def").set("count", 5));
235+
addDocument(new Document("data3").set("name", "def").set("count", 25));
236+
addDocument(new Document("data4").set("name", "ghi").set("count", 15));
237+
addDocument(new Document("data5").set("name", "jkl").set("count", 20));
238+
239+
// Create aggregation with cursor
240+
AggregationBuilder aggr = new AggregationBuilder()
241+
.groupBy("@name", Reducers.sum("@count").as("sum"))
242+
.sortBy(10, SortedField.desc("@sum"))
243+
.cursor(2, 10000); // 2 results per batch
244+
245+
// Test remove() method
246+
try (AggregateIterator iterator = cluster.ftAggregateIterator(index, aggr)) {
247+
assertTrue(iterator.hasNext());
248+
assertNotNull(iterator.getCursorId());
249+
assertTrue(iterator.getCursorId() > 0);
250+
251+
// Get first batch
252+
AggregationResult firstBatch = iterator.next();
253+
assertNotNull(firstBatch);
254+
assertEquals(2, firstBatch.getRows().size());
255+
256+
// Should still have more results
257+
assertTrue(iterator.hasNext());
258+
259+
// Remove the cursor - this should terminate the iteration
260+
iterator.remove();
261+
262+
// After remove, should have no more results
263+
assertFalse(iterator.hasNext());
264+
assertEquals(Long.valueOf(-1), iterator.getCursorId());
265+
266+
// Calling next() should throw NoSuchElementException
267+
assertThrows(NoSuchElementException.class, iterator::next);
268+
}
269+
}
270+
}

0 commit comments

Comments
 (0)