Skip to content

Add and refactor integration tests in DefaultShareConsumerFactoryTests #3932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
package org.springframework.kafka.core;

import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -49,7 +54,7 @@
* @since 4.0
*/
@EmbeddedKafka(
topics = {"embedded-share-test"}, partitions = 1,
topics = {"embedded-share-test", "embedded-share-distribution-test"}, partitions = 1,
brokerProperties = {
"unstable.api.versions.enable=true",
"group.coordinator.rebalance.protocols=classic,share",
Expand Down Expand Up @@ -144,7 +149,6 @@ void shouldReturnUnmodifiableListenersList() {
}

@Test
@SuppressWarnings("try")
void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) throws Exception {
final String topic = "embedded-share-test";
final String groupId = "testGroup";
Expand All @@ -159,23 +163,7 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
producer.send(new ProducerRecord<>(topic, "key", "integration-test-value")).get();
}

Map<String, Object> adminProperties = new HashMap<>();
adminProperties.put("bootstrap.servers", bootstrapServers);

// For this test: force new share groups to start from the beginning of the topic.
// This is NOT the same as the usual consumer auto.offset.reset; it's a group config,
// so use AdminClient to set share.auto.offset.reset = earliest for our test group.
try (AdminClient ignored = AdminClient.create(adminProperties)) {
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);

Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op));

try (Admin admin = AdminClient.create(adminProperties)) {
admin.incrementalAlterConfigs(configs).all().get();
}
}
setShareAutoOffsetResetEarliest(bootstrapServers, groupId);

var consumerProps = new HashMap<String, Object>();
consumerProps.put("bootstrap.servers", bootstrapServers);
Expand All @@ -197,4 +185,101 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
consumer.close();
}

@Test
void integrationTestSharedConsumersDistribution(EmbeddedKafkaBroker broker) throws Exception {
String topic = "shared-consumer-dist-test";
final String groupId = "distributionTestGroup";
int recordCount = 8;
List<String> consumerIds = List.of("client-dist-1", "client-dist-2");
List<String> allReceived = runSharedConsumerTest(topic, groupId, consumerIds, recordCount, broker);

// Assert all records were received (no loss and no duplicates)
assertThat(allReceived)
.containsExactlyInAnyOrder(
topic + "-value-0",
topic + "-value-1",
topic + "-value-2",
topic + "-value-3",
topic + "-value-4",
topic + "-value-5",
topic + "-value-6",
topic + "-value-7"
)
.doesNotHaveDuplicates();
}

/**
* Runs multiple Kafka consumers in parallel using ExecutorService, collects all records received,
* and returns a list of all record values received by all consumers.
*/
private static List<String> runSharedConsumerTest(String topic, String groupId,
List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception {
var bootstrapServers = broker.getBrokersAsString();

var producerProps = new java.util.Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (var producer = new KafkaProducer<String, String>(producerProps)) {
for (int i = 0; i < recordCount; i++) {
producer.send(new ProducerRecord<>(topic, "key" + i, topic + "-value-" + i)).get();
}
producer.flush();
}

setShareAutoOffsetResetEarliest(bootstrapServers, groupId);

List<String> allReceived = Collections.synchronizedList(new ArrayList<>());
var latch = new java.util.concurrent.CountDownLatch(recordCount);
ExecutorService executor = Executors.newCachedThreadPool();
DefaultShareConsumerFactory<String, String> shareConsumerFactory = new DefaultShareConsumerFactory<>(
Map.of(
"bootstrap.servers", bootstrapServers,
"key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,
"value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class
)
);
for (int i = 0; i < consumerIds.size(); i++) {
final int idx = i;
executor.submit(() -> {
try (var consumer = shareConsumerFactory.createShareConsumer(groupId, consumerIds.get(idx))) {
consumer.subscribe(Collections.singletonList(topic));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to move create and subscribe outside of the thread.
And only poll in that submitted task.
Doesn't that work?
Indeed it might be not related to our test case, but that how I feel consumers are supposed to be used.

while (latch.getCount() > 0) {
var records = consumer.poll(Duration.ofMillis(200));
for (var r : records) {
allReceived.add(r.value());
consumer.acknowledge(r, AcknowledgeType.ACCEPT);
latch.countDown();
}
}
}
});
}

assertThat(latch.await(10, TimeUnit.SECONDS))
.as("All records should be received within timeout")
.isTrue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need all of these extra variables and even don't need to deal with Future.
The executor.shutdown(); and then assert on the awaitTermination() should do the trick.
See that shutdown() JavaDocs.

executor.shutdown();
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS))
.as("Executor should terminate after shutdown")
.isTrue();
return allReceived;
}

/**
* Sets the share.auto.offset.reset group config to earliest for the given groupId,
* using the provided bootstrapServers.
*/
private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProperties = new HashMap<>();
adminProperties.put("bootstrap.servers", bootstrapServers);
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op));
try (Admin admin = AdminClient.create(adminProperties)) {
admin.incrementalAlterConfigs(configs).all().get();
}
}

}