diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java index b6ea2e5e09..bba1edf68d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java @@ -17,16 +17,21 @@ package org.springframework.kafka.core; import java.time.Duration; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -49,7 +54,7 @@ * @since 4.0 */ @EmbeddedKafka( - topics = {"embedded-share-test"}, partitions = 1, + topics = {"embedded-share-test", "embedded-share-distribution-test"}, partitions = 1, brokerProperties = { "unstable.api.versions.enable=true", "group.coordinator.rebalance.protocols=classic,share", @@ -144,7 +149,6 @@ void shouldReturnUnmodifiableListenersList() { } @Test - @SuppressWarnings("try") void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) throws Exception { final String topic = "embedded-share-test"; final String groupId = "testGroup"; @@ -159,23 +163,7 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro producer.send(new ProducerRecord<>(topic, "key", "integration-test-value")).get(); } - Map adminProperties = new HashMap<>(); - adminProperties.put("bootstrap.servers", bootstrapServers); - - // For this test: force new share groups to start from the beginning of the topic. - // This is NOT the same as the usual consumer auto.offset.reset; it's a group config, - // so use AdminClient to set share.auto.offset.reset = earliest for our test group. - try (AdminClient ignored = AdminClient.create(adminProperties)) { - ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); - AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); - - Map> configs = Map.of( - new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op)); - - try (Admin admin = AdminClient.create(adminProperties)) { - admin.incrementalAlterConfigs(configs).all().get(); - } - } + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); var consumerProps = new HashMap(); consumerProps.put("bootstrap.servers", bootstrapServers); @@ -197,4 +185,101 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro consumer.close(); } + @Test + void integrationTestSharedConsumersDistribution(EmbeddedKafkaBroker broker) throws Exception { + String topic = "shared-consumer-dist-test"; + final String groupId = "distributionTestGroup"; + int recordCount = 8; + List consumerIds = List.of("client-dist-1", "client-dist-2"); + List allReceived = runSharedConsumerTest(topic, groupId, consumerIds, recordCount, broker); + + // Assert all records were received (no loss and no duplicates) + assertThat(allReceived) + .containsExactlyInAnyOrder( + topic + "-value-0", + topic + "-value-1", + topic + "-value-2", + topic + "-value-3", + topic + "-value-4", + topic + "-value-5", + topic + "-value-6", + topic + "-value-7" + ) + .doesNotHaveDuplicates(); + } + + /** + * Runs multiple Kafka consumers in parallel using ExecutorService, collects all records received, + * and returns a list of all record values received by all consumers. + */ + private static List runSharedConsumerTest(String topic, String groupId, + List consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception { + var bootstrapServers = broker.getBrokersAsString(); + + var producerProps = new java.util.Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + try (var producer = new KafkaProducer(producerProps)) { + for (int i = 0; i < recordCount; i++) { + producer.send(new ProducerRecord<>(topic, "key" + i, topic + "-value-" + i)).get(); + } + producer.flush(); + } + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + List allReceived = Collections.synchronizedList(new ArrayList<>()); + var latch = new java.util.concurrent.CountDownLatch(recordCount); + ExecutorService executor = Executors.newCachedThreadPool(); + DefaultShareConsumerFactory shareConsumerFactory = new DefaultShareConsumerFactory<>( + Map.of( + "bootstrap.servers", bootstrapServers, + "key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class, + "value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class + ) + ); + for (int i = 0; i < consumerIds.size(); i++) { + final int idx = i; + executor.submit(() -> { + try (var consumer = shareConsumerFactory.createShareConsumer(groupId, consumerIds.get(idx))) { + consumer.subscribe(Collections.singletonList(topic)); + while (latch.getCount() > 0) { + var records = consumer.poll(Duration.ofMillis(200)); + for (var r : records) { + allReceived.add(r.value()); + consumer.acknowledge(r, AcknowledgeType.ACCEPT); + latch.countDown(); + } + } + } + }); + } + + assertThat(latch.await(10, TimeUnit.SECONDS)) + .as("All records should be received within timeout") + .isTrue(); + executor.shutdown(); + assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)) + .as("Executor should terminate after shutdown") + .isTrue(); + return allReceived; + } + + /** + * Sets the share.auto.offset.reset group config to earliest for the given groupId, + * using the provided bootstrapServers. + */ + private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { + Map adminProperties = new HashMap<>(); + adminProperties.put("bootstrap.servers", bootstrapServers); + ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); + AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); + Map> configs = Map.of( + new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op)); + try (Admin admin = AdminClient.create(adminProperties)) { + admin.incrementalAlterConfigs(configs).all().get(); + } + } + }