Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.ListOffsetsResponse;
Expand Down Expand Up @@ -294,4 +295,157 @@ private void setUpForLogAppendTimeCase() throws InterruptedException {
private void createTopicWithConfig(String topic, Map<String, String> props) throws InterruptedException {
clusterInstance.createTopic(topic, PARTITION, REPLICAS, props);
}

@ClusterTest
public void testListOffsetsWithTopicIds() throws InterruptedException, ExecutionException {
// In KRaft mode, all topics should have topicIds
String topicWithId = "topic-with-id";
clusterInstance.createTopic(topicWithId, 1, REPLICAS);

// Verify topic has a valid topicId
Uuid topicId = adminClient.describeTopics(List.of(topicWithId))
.allTopicNames()
.get()
.get(topicWithId)
.topicId();

assertEquals(false, topicId.equals(Uuid.ZERO_UUID), "Topic should have a valid topicId in KRaft mode");

// Produce some messages
try (Producer<byte[], byte[]> producer = clusterInstance.producer()) {
producer.send(new ProducerRecord<>(topicWithId, 0, 100L, null, new byte[10])).get();
producer.send(new ProducerRecord<>(topicWithId, 0, 200L, null, new byte[10])).get();
producer.send(new ProducerRecord<>(topicWithId, 0, 300L, null, new byte[10])).get();
}

// Test listOffsets with topics that have topicIds
TopicPartition tp = new TopicPartition(topicWithId, 0);

ListOffsetsResultInfo earliestOffset = adminClient.listOffsets(
Map.of(tp, OffsetSpec.earliest())).all().get().get(tp);
assertEquals(0, earliestOffset.offset());

ListOffsetsResultInfo latestOffset = adminClient.listOffsets(
Map.of(tp, OffsetSpec.latest())).all().get().get(tp);
assertEquals(3, latestOffset.offset());

ListOffsetsResultInfo maxTimestampOffset = adminClient.listOffsets(
Map.of(tp, OffsetSpec.maxTimestamp())).all().get().get(tp);
// The message with timestamp 300L is at offset 2
assertEquals(2, maxTimestampOffset.offset());
assertEquals(300L, maxTimestampOffset.timestamp());
}

@ClusterTest
public void testListOffsetsWithMultipleTopicsHavingIds() throws InterruptedException, ExecutionException {
// Create multiple topics - all should have topicIds in KRaft mode
String topic1 = "multi-topic-1";
String topic2 = "multi-topic-2";
clusterInstance.createTopic(topic1, 1, REPLICAS);
clusterInstance.createTopic(topic2, 1, REPLICAS);

// Verify both topics have valid topicIds
Map<String, TopicDescription> descriptions = adminClient.describeTopics(List.of(topic1, topic2))
.allTopicNames()
.get();

Uuid topicId1 = descriptions.get(topic1).topicId();
Uuid topicId2 = descriptions.get(topic2).topicId();

assertEquals(false, topicId1.equals(Uuid.ZERO_UUID),
"Topic1 should have a valid topicId");
assertEquals(false, topicId2.equals(Uuid.ZERO_UUID),
"Topic2 should have a valid topicId");

// Produce messages to both topics
try (Producer<byte[], byte[]> producer = clusterInstance.producer()) {
producer.send(new ProducerRecord<>(topic1, 0, 100L, null, new byte[10])).get();
producer.send(new ProducerRecord<>(topic2, 0, 200L, null, new byte[10])).get();
}

// Query offsets for both topics in a single request
TopicPartition tp1 = new TopicPartition(topic1, 0);
TopicPartition tp2 = new TopicPartition(topic2, 0);

Map<TopicPartition, ListOffsetsResultInfo> results = adminClient.listOffsets(
Map.of(
tp1, OffsetSpec.latest(),
tp2, OffsetSpec.latest()
)
).all().get();

// When all topics have topicIds, the request should use version 12 (topicId-based)
// and successfully return results for both topics
assertEquals(1, results.get(tp1).offset());
assertEquals(1, results.get(tp2).offset());
}

@ClusterTest
public void testListOffsetsBackwardsCompatibilityWithIdBasedProtocol() throws InterruptedException, ExecutionException {
// Test that the handler correctly processes responses even when older protocol versions are used
String topic = "backwards-compat-topic";
clusterInstance.createTopic(topic, 1, REPLICAS);

// Produce messages
try (Producer<byte[], byte[]> producer = clusterInstance.producer()) {
producer.send(new ProducerRecord<>(topic, 0, 100L, null, new byte[10])).get();
producer.send(new ProducerRecord<>(topic, 0, 200L, null, new byte[10])).get();
producer.send(new ProducerRecord<>(topic, 0, 300L, null, new byte[10])).get();
}

TopicPartition tp = new TopicPartition(topic, 0);

// Even though the broker supports version 12 (topicId), the admin client should be able
// to handle responses correctly regardless of the negotiated version
ListOffsetsResultInfo result = adminClient.listOffsets(
Map.of(tp, OffsetSpec.earliest())
).all().get().get(tp);

assertEquals(0, result.offset());

// Test with maxTimestamp which requires version 7+
ListOffsetsResultInfo maxResult = adminClient.listOffsets(
Map.of(tp, OffsetSpec.maxTimestamp())
).all().get().get(tp);

assertEquals(2, maxResult.offset());
assertEquals(300L, maxResult.timestamp());
}

@ClusterTest
public void testListOffsetsTopicIdConsistency() throws InterruptedException, ExecutionException {
// Verify that using topicIds in the request produces consistent results
// with using topic names
String topic = "consistency-test-topic";
clusterInstance.createTopic(topic, 1, REPLICAS);

// Produce messages with different timestamps
try (Producer<byte[], byte[]> producer = clusterInstance.producer()) {
producer.send(new ProducerRecord<>(topic, 0, 100L, null, new byte[10])).get();
producer.send(new ProducerRecord<>(topic, 0, 500L, null, new byte[10])).get();
producer.send(new ProducerRecord<>(topic, 0, 300L, null, new byte[10])).get();
}

TopicPartition tp = new TopicPartition(topic, 0);

// Get offsets using the standard listOffsets API
// (which will use topicIds if available in KRaft mode)
ListOffsetsResultInfo earliestResult = adminClient.listOffsets(
Map.of(tp, OffsetSpec.earliest())
).all().get().get(tp);

ListOffsetsResultInfo latestResult = adminClient.listOffsets(
Map.of(tp, OffsetSpec.latest())
).all().get().get(tp);

ListOffsetsResultInfo maxTimestampResult = adminClient.listOffsets(
Map.of(tp, OffsetSpec.maxTimestamp())
).all().get().get(tp);

// Verify results are correct
assertEquals(0, earliestResult.offset(), "Earliest offset should be 0");
assertEquals(3, latestResult.offset(), "Latest offset should be 3");
assertEquals(1, maxTimestampResult.offset(), "Max timestamp offset should be 1 (timestamp 500L)");
assertEquals(500L, maxTimestampResult.timestamp(), "Max timestamp should be 500L");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ Map<String, Uuid> topicIds() {
return topicIds;
}

public Uuid topicId(String name) {
return topicIds.getOrDefault(name, Uuid.ZERO_UUID);
}

public String topicName(Uuid uuid) {
return topicNames.get(uuid);
}

Map<Uuid, String> topicNames() {
return topicNames;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4262,7 +4262,13 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet(), partitionLeaderCache);
Map<TopicPartition, Long> offsetQueriesByPartition = topicPartitionOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> getOffsetFromSpec(e.getValue())));
ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, options, logContext, defaultApiTimeoutMs);
// Pass a supplier to get fresh metadata instead of a static cluster snapshot
ListOffsetsHandler handler = new ListOffsetsHandler(
offsetQueriesByPartition,
metadataManager::cluster,
options,
logContext,
defaultApiTimeoutMs);
invokeDriver(handler, future, options.timeoutMs);
return new ListOffsetsResult(future.all());
}
Expand Down Expand Up @@ -5082,7 +5088,8 @@ private <K, V> void invokeDriver(
deadlineMs,
retryBackoffMs,
retryBackoffMaxMs,
logContext
logContext,
metadataManager
);

maybeSendRequests(driver, currentTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class AdminApiDriver<K, V> {
private final long deadlineMs;
private final AdminApiHandler<K, V> handler;
private final AdminApiFuture<K, V> future;
private final AdminMetadataManager metadataManager;

private final BiMultimap<ApiRequestScope, K> lookupMap = new BiMultimap<>();
private final BiMultimap<FulfillmentScope, K> fulfillmentMap = new BiMultimap<>();
Expand All @@ -99,11 +101,13 @@ public AdminApiDriver(
long deadlineMs,
long retryBackoffMs,
long retryBackoffMaxMs,
LogContext logContext
LogContext logContext,
AdminMetadataManager metadataManager
) {
this.handler = handler;
this.future = future;
this.deadlineMs = deadlineMs;
this.metadataManager = metadataManager;
this.retryBackoff = new ExponentialBackoff(
retryBackoffMs,
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
Expand Down Expand Up @@ -237,6 +241,13 @@ public void onResponse(
completeExceptionally(result.failedKeys);
retryLookup(result.unmappedKeys);
} else {
// Update AdminMetadataManager if this is a MetadataResponse from lookup stage
if (metadataManager != null && response instanceof MetadataResponse) {
MetadataResponse metadataResponse = (MetadataResponse) response;
log.debug("Received MetadataResponse in lookup stage, updating AdminMetadataManager");
metadataManager.update(metadataResponse.buildCluster(), currentTimeMs);
}

AdminApiLookupStrategy.LookupResult<K> result = handler.lookupStrategy().handleResponse(
spec.keys,
response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand All @@ -31,8 +33,12 @@

import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand Down Expand Up @@ -314,10 +320,40 @@ public void update(Cluster cluster, long now) {
this.metadataAttemptStartMs = Optional.empty();

if (!cluster.nodes().isEmpty()) {
// Merge topicIds from the new cluster with existing topicIds
// This is necessary because global metadata requests don't include topic info,
// but lookup requests do. We want to preserve topicId information from lookups.
Map<String, Uuid> mergedTopicIds = new HashMap<>(this.cluster.topicIds());
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is overhead large?

mergedTopicIds.putAll(cluster.topicIds());

// Create a new cluster with merged topicIds
if (!mergedTopicIds.equals(cluster.topicIds())) {
// Collect all partitions from the cluster
Collection<PartitionInfo> allPartitions = new ArrayList<>();
for (String topic : cluster.topics()) {
allPartitions.addAll(cluster.partitionsForTopic(topic));
}

cluster = new Cluster(
cluster.clusterResource().clusterId(),
cluster.nodes(),
allPartitions,
cluster.unauthorizedTopics(),
cluster.invalidTopics(),
cluster.internalTopics(),
cluster.controller(),
mergedTopicIds
);
}

this.cluster = cluster;
}
}

public Cluster cluster() {
return cluster;
}

public void initiateRebootstrap() {
this.metadataAttemptStartMs = Optional.of(0L);
}
Expand Down
Loading
Loading