17
17
package org .springframework .kafka .core ;
18
18
19
19
import java .time .Duration ;
20
- import java .util .Arrays ;
20
+ import java .util .ArrayList ;
21
21
import java .util .Collection ;
22
22
import java .util .Collections ;
23
23
import java .util .HashMap ;
24
+ import java .util .List ;
24
25
import java .util .Map ;
26
+ import java .util .concurrent .ExecutorService ;
27
+ import java .util .concurrent .Executors ;
28
+ import java .util .concurrent .TimeUnit ;
25
29
26
30
import org .apache .kafka .clients .admin .Admin ;
27
31
import org .apache .kafka .clients .admin .AdminClient ;
28
32
import org .apache .kafka .clients .admin .AlterConfigOp ;
29
33
import org .apache .kafka .clients .admin .ConfigEntry ;
34
+ import org .apache .kafka .clients .consumer .AcknowledgeType ;
30
35
import org .apache .kafka .clients .consumer .ShareConsumer ;
31
36
import org .apache .kafka .clients .producer .KafkaProducer ;
32
37
import org .apache .kafka .clients .producer .ProducerConfig ;
49
54
* @since 4.0
50
55
*/
51
56
@ EmbeddedKafka (
52
- topics = {"embedded-share-test" }, partitions = 1 ,
57
+ topics = {"embedded-share-test" , "embedded-share-distribution-test" }, partitions = 1 ,
53
58
brokerProperties = {
54
59
"unstable.api.versions.enable=true" ,
55
60
"group.coordinator.rebalance.protocols=classic,share" ,
@@ -144,7 +149,6 @@ void shouldReturnUnmodifiableListenersList() {
144
149
}
145
150
146
151
@ Test
147
- @ SuppressWarnings ("try" )
148
152
void integrationTestDefaultShareConsumerFactory (EmbeddedKafkaBroker broker ) throws Exception {
149
153
final String topic = "embedded-share-test" ;
150
154
final String groupId = "testGroup" ;
@@ -159,23 +163,7 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
159
163
producer .send (new ProducerRecord <>(topic , "key" , "integration-test-value" )).get ();
160
164
}
161
165
162
- Map <String , Object > adminProperties = new HashMap <>();
163
- adminProperties .put ("bootstrap.servers" , bootstrapServers );
164
-
165
- // For this test: force new share groups to start from the beginning of the topic.
166
- // This is NOT the same as the usual consumer auto.offset.reset; it's a group config,
167
- // so use AdminClient to set share.auto.offset.reset = earliest for our test group.
168
- try (AdminClient ignored = AdminClient .create (adminProperties )) {
169
- ConfigEntry entry = new ConfigEntry ("share.auto.offset.reset" , "earliest" );
170
- AlterConfigOp op = new AlterConfigOp (entry , AlterConfigOp .OpType .SET );
171
-
172
- Map <ConfigResource , Collection <AlterConfigOp >> configs = Map .of (
173
- new ConfigResource (ConfigResource .Type .GROUP , "testGroup" ), Arrays .asList (op ));
174
-
175
- try (Admin admin = AdminClient .create (adminProperties )) {
176
- admin .incrementalAlterConfigs (configs ).all ().get ();
177
- }
178
- }
166
+ setShareAutoOffsetResetEarliest (bootstrapServers , groupId );
179
167
180
168
var consumerProps = new HashMap <String , Object >();
181
169
consumerProps .put ("bootstrap.servers" , bootstrapServers );
@@ -197,4 +185,101 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
197
185
consumer .close ();
198
186
}
199
187
188
+ @ Test
189
+ void integrationTestSharedConsumersDistribution (EmbeddedKafkaBroker broker ) throws Exception {
190
+ String topic = "shared-consumer-dist-test" ;
191
+ final String groupId = "distributionTestGroup" ;
192
+ int recordCount = 8 ;
193
+ List <String > consumerIds = List .of ("client-dist-1" , "client-dist-2" );
194
+ List <String > allReceived = runSharedConsumerTest (topic , groupId , consumerIds , recordCount , broker );
195
+
196
+ // Assert all records were received (no loss and no duplicates)
197
+ assertThat (allReceived )
198
+ .containsExactlyInAnyOrder (
199
+ topic + "-value-0" ,
200
+ topic + "-value-1" ,
201
+ topic + "-value-2" ,
202
+ topic + "-value-3" ,
203
+ topic + "-value-4" ,
204
+ topic + "-value-5" ,
205
+ topic + "-value-6" ,
206
+ topic + "-value-7"
207
+ )
208
+ .doesNotHaveDuplicates ();
209
+ }
210
+
211
+ /**
212
+ * Runs multiple Kafka consumers in parallel using ExecutorService, collects all records received,
213
+ * and returns a list of all record values received by all consumers.
214
+ */
215
+ private static List <String > runSharedConsumerTest (String topic , String groupId ,
216
+ List <String > consumerIds , int recordCount , EmbeddedKafkaBroker broker ) throws Exception {
217
+ var bootstrapServers = broker .getBrokersAsString ();
218
+
219
+ var producerProps = new java .util .Properties ();
220
+ producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
221
+ producerProps .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class );
222
+ producerProps .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class );
223
+ try (var producer = new KafkaProducer <String , String >(producerProps )) {
224
+ for (int i = 0 ; i < recordCount ; i ++) {
225
+ producer .send (new ProducerRecord <>(topic , "key" + i , topic + "-value-" + i )).get ();
226
+ }
227
+ producer .flush ();
228
+ }
229
+
230
+ setShareAutoOffsetResetEarliest (bootstrapServers , groupId );
231
+
232
+ List <String > allReceived = Collections .synchronizedList (new ArrayList <>());
233
+ var latch = new java .util .concurrent .CountDownLatch (recordCount );
234
+ ExecutorService executor = Executors .newCachedThreadPool ();
235
+ DefaultShareConsumerFactory <String , String > shareConsumerFactory = new DefaultShareConsumerFactory <>(
236
+ Map .of (
237
+ "bootstrap.servers" , bootstrapServers ,
238
+ "key.deserializer" , org .apache .kafka .common .serialization .StringDeserializer .class ,
239
+ "value.deserializer" , org .apache .kafka .common .serialization .StringDeserializer .class
240
+ )
241
+ );
242
+ for (int i = 0 ; i < consumerIds .size (); i ++) {
243
+ final int idx = i ;
244
+ executor .submit (() -> {
245
+ try (var consumer = shareConsumerFactory .createShareConsumer (groupId , consumerIds .get (idx ))) {
246
+ consumer .subscribe (Collections .singletonList (topic ));
247
+ while (latch .getCount () > 0 ) {
248
+ var records = consumer .poll (Duration .ofMillis (200 ));
249
+ for (var r : records ) {
250
+ allReceived .add (r .value ());
251
+ consumer .acknowledge (r , AcknowledgeType .ACCEPT );
252
+ latch .countDown ();
253
+ }
254
+ }
255
+ }
256
+ });
257
+ }
258
+
259
+ assertThat (latch .await (10 , TimeUnit .SECONDS ))
260
+ .as ("All records should be received within timeout" )
261
+ .isTrue ();
262
+ executor .shutdown ();
263
+ assertThat (executor .awaitTermination (10 , TimeUnit .SECONDS ))
264
+ .as ("Executor should terminate after shutdown" )
265
+ .isTrue ();
266
+ return allReceived ;
267
+ }
268
+
269
+ /**
270
+ * Sets the share.auto.offset.reset group config to earliest for the given groupId,
271
+ * using the provided bootstrapServers.
272
+ */
273
+ private static void setShareAutoOffsetResetEarliest (String bootstrapServers , String groupId ) throws Exception {
274
+ Map <String , Object > adminProperties = new HashMap <>();
275
+ adminProperties .put ("bootstrap.servers" , bootstrapServers );
276
+ ConfigEntry entry = new ConfigEntry ("share.auto.offset.reset" , "earliest" );
277
+ AlterConfigOp op = new AlterConfigOp (entry , AlterConfigOp .OpType .SET );
278
+ Map <ConfigResource , Collection <AlterConfigOp >> configs = Map .of (
279
+ new ConfigResource (ConfigResource .Type .GROUP , groupId ), List .of (op ));
280
+ try (Admin admin = AdminClient .create (adminProperties )) {
281
+ admin .incrementalAlterConfigs (configs ).all ().get ();
282
+ }
283
+ }
284
+
200
285
}
0 commit comments