Skip to content

Commit 66e0234

Browse files
committed
impl admin
1 parent 31020ac commit 66e0234

File tree

6 files changed

+103
-14
lines changed

6 files changed

+103
-14
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ public class KafkaAdminClient extends AdminClient {
410410
private final Map<TopicPartition, Integer> partitionLeaderCache;
411411
private final AdminFetchMetricsManager adminFetchMetricsManager;
412412
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
413+
private final Map<String, Uuid> topicIdsByNames = new HashMap<>();
414+
private final Map<Uuid, String> topicNameById = new HashMap<>();
413415

414416
/**
415417
* The telemetry requests client instance id.
@@ -4263,7 +4265,8 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
42634265
ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet(), partitionLeaderCache);
42644266
Map<TopicPartition, Long> offsetQueriesByPartition = topicPartitionOffsets.entrySet().stream()
42654267
.collect(Collectors.toMap(Map.Entry::getKey, e -> getOffsetFromSpec(e.getValue())));
4266-
ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, options, logContext, defaultApiTimeoutMs);
4268+
ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, options, logContext, defaultApiTimeoutMs,
4269+
topicIdsByNames, topicNameById);
42674270
invokeDriver(handler, future, options.timeoutMs);
42684271
return new ListOffsetsResult(future.all());
42694272
}
@@ -5109,6 +5112,8 @@ AbstractRequest.Builder<?> createRequest(int timeoutMs) {
51095112
void handleResponse(AbstractResponse response) {
51105113
long currentTimeMs = time.milliseconds();
51115114
driver.onResponse(currentTimeMs, spec, response, this.curNode());
5115+
topicIdsByNames.putAll(driver.getTopicIdByName());
5116+
topicNameById.putAll(driver.getTopicNameById());
51125117
maybeSendRequests(driver, currentTimeMs);
51135118
}
51145119

clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.CommonClientConfigs;
2020
import org.apache.kafka.common.Node;
21+
import org.apache.kafka.common.Uuid;
2122
import org.apache.kafka.common.errors.DisconnectException;
2223
import org.apache.kafka.common.errors.UnsupportedVersionException;
2324
import org.apache.kafka.common.requests.AbstractRequest;
@@ -92,6 +93,8 @@ public class AdminApiDriver<K, V> {
9293
private final BiMultimap<ApiRequestScope, K> lookupMap = new BiMultimap<>();
9394
private final BiMultimap<FulfillmentScope, K> fulfillmentMap = new BiMultimap<>();
9495
private final Map<ApiRequestScope, RequestState> requestStates = new HashMap<>();
96+
private final Map<String, Uuid> topicIdByName = new HashMap<>();
97+
private final Map<Uuid, String> topicNameById = new HashMap<>();
9598

9699
public AdminApiDriver(
97100
AdminApiHandler<K, V> handler,
@@ -243,11 +246,22 @@ public void onResponse(
243246
);
244247

245248
result.completedKeys.forEach(lookupMap::remove);
249+
this.topicIdByName.putAll(result.topicIdByName);
250+
this.topicNameById.putAll(result.topicNameById);
251+
246252
completeLookup(result.mappedKeys);
247253
completeLookupExceptionally(result.failedKeys);
248254
}
249255
}
250256

257+
public Map<String, Uuid> getTopicIdByName() {
258+
return topicIdByName;
259+
}
260+
261+
public Map<Uuid, String> getTopicNameById() {
262+
return topicNameById;
263+
}
264+
251265
/**
252266
* Callback that is invoked when a `Call` is failed.
253267
*/

clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.clients.admin.internals;
1818

19+
import org.apache.kafka.common.Uuid;
1920
import org.apache.kafka.common.errors.UnsupportedVersionException;
2021
import org.apache.kafka.common.requests.AbstractRequest;
2122
import org.apache.kafka.common.requests.AbstractResponse;
@@ -113,21 +114,46 @@ class LookupResult<K> {
113114
// phase. The driver will not attempt lookup or fulfillment for failed keys.
114115
public final Map<K, Throwable> failedKeys;
115116

117+
public final Map<String, Uuid> topicIdByName;
118+
119+
public final Map<Uuid, String> topicNameById;
120+
116121
public LookupResult(
117122
Map<K, Throwable> failedKeys,
118123
Map<K, Integer> mappedKeys
119124
) {
120125
this(Collections.emptyList(), failedKeys, mappedKeys);
121126
}
122127

128+
public LookupResult(
129+
Map<K, Throwable> failedKeys,
130+
Map<K, Integer> mappedKeys,
131+
Map<String, Uuid> topicIdByName,
132+
Map<Uuid, String> topicNameById
133+
) {
134+
this(Collections.emptyList(), failedKeys, mappedKeys, topicIdByName, topicNameById);
135+
}
136+
123137
public LookupResult(
124138
List<K> completedKeys,
125139
Map<K, Throwable> failedKeys,
126140
Map<K, Integer> mappedKeys
141+
) {
142+
this(completedKeys, failedKeys, mappedKeys, Collections.emptyMap(), Collections.emptyMap());
143+
}
144+
145+
public LookupResult(
146+
List<K> completedKeys,
147+
Map<K, Throwable> failedKeys,
148+
Map<K, Integer> mappedKeys,
149+
Map<String, Uuid> topicIdByName,
150+
Map<Uuid, String> topicNameById
127151
) {
128152
this.completedKeys = Collections.unmodifiableList(completedKeys);
129153
this.failedKeys = Collections.unmodifiableMap(failedKeys);
130154
this.mappedKeys = Collections.unmodifiableMap(mappedKeys);
155+
this.topicIdByName = Collections.unmodifiableMap(topicIdByName);
156+
this.topicNameById = Collections.unmodifiableMap(topicNameById);
131157
}
132158

133159
static <K> LookupResult<K> empty() {

clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
2222
import org.apache.kafka.common.Node;
2323
import org.apache.kafka.common.TopicPartition;
24+
import org.apache.kafka.common.Uuid;
2425
import org.apache.kafka.common.errors.ApiException;
2526
import org.apache.kafka.common.errors.RetriableException;
2627
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -43,6 +44,7 @@
4344
import java.util.HashSet;
4445
import java.util.List;
4546
import java.util.Map;
47+
import java.util.Objects;
4648
import java.util.Optional;
4749
import java.util.Set;
4850
import java.util.stream.Collectors;
@@ -54,18 +56,24 @@ public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffset
5456
private final Logger log;
5557
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
5658
private final int defaultApiTimeoutMs;
59+
private final Map<String, Uuid> topicIdByName;
60+
private final Map<Uuid, String> topicNameById;
5761

5862
public ListOffsetsHandler(
5963
Map<TopicPartition, Long> offsetTimestampsByPartition,
6064
ListOffsetsOptions options,
6165
LogContext logContext,
62-
int defaultApiTimeoutMs
66+
int defaultApiTimeoutMs,
67+
Map<String, Uuid> topicIdByName,
68+
Map<Uuid, String> topicNameById
6369
) {
6470
this.offsetTimestampsByPartition = offsetTimestampsByPartition;
6571
this.options = options;
6672
this.log = logContext.logger(ListOffsetsHandler.class);
6773
this.lookupStrategy = new PartitionLeaderStrategy(logContext, false);
6874
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
75+
this.topicIdByName = topicIdByName;
76+
this.topicNameById = topicNameById;
6977
}
7078

7179
@Override
@@ -82,7 +90,8 @@ public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
8290
ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition> keys) {
8391
Map<String, ListOffsetsTopic> topicsByName = CollectionUtils.groupPartitionsByTopic(
8492
keys,
85-
topicName -> new ListOffsetsTopic().setName(topicName),
93+
topicName -> new ListOffsetsTopic().setName(topicName)
94+
.setTopicId(topicIdByName.getOrDefault(topicName, Uuid.ZERO_UUID)),
8695
(listOffsetsTopic, partitionId) -> {
8796
TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId);
8897
long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
@@ -91,6 +100,15 @@ ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition>
91100
.setPartitionIndex(partitionId)
92101
.setTimestamp(offsetTimestamp));
93102
});
103+
104+
// Only allow topicId-based protocol (v12) if ALL topics have valid topicIds
105+
// If any topic has ZERO_UUID, we must restrict to name-based protocol (v11 or lower)
106+
// This is because in a given protocol version, we can only use topicId OR topicName, not both
107+
boolean canUseTopicIds = !topicsByName.isEmpty() && topicsByName.values().stream()
108+
.filter(Objects::nonNull)
109+
.map(ListOffsetsTopic::topicId)
110+
.allMatch(topicId -> topicId != null && !topicId.equals(Uuid.ZERO_UUID));
111+
94112
boolean supportsMaxTimestamp = keys
95113
.stream()
96114
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP);
@@ -113,7 +131,8 @@ ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition>
113131
supportsMaxTimestamp,
114132
requireEarliestLocalTimestamp,
115133
requireTieredStorageTimestamp,
116-
requireEarliestPendingUploadTimestamp)
134+
requireEarliestPendingUploadTimestamp,
135+
canUseTopicIds)
117136
.setTargetTimes(new ArrayList<>(topicsByName.values()))
118137
.setTimeoutMs(timeoutMs);
119138
}
@@ -132,7 +151,18 @@ public ApiResult<TopicPartition, ListOffsetsResultInfo> handleResponse(
132151

133152
for (ListOffsetsTopicResponse topic : response.topics()) {
134153
for (ListOffsetsPartitionResponse partition : topic.partitions()) {
135-
TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
154+
// Determine topic name based on response version:
155+
// Version 12+: uses topicId (name will be null/empty)
156+
// Version < 12: uses name (topicId will be null or ZERO_UUID)
157+
TopicPartition topicPartition;
158+
if (topic.topicId() != null && !topic.topicId().equals(Uuid.ZERO_UUID)) {
159+
// Version 12+: resolve topicName from topicId
160+
String topicName = topicNameById.get(topic.topicId());
161+
topicPartition = new TopicPartition(topicName, partition.partitionIndex());
162+
} else {
163+
// Version < 12: use topicName directly
164+
topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
165+
}
136166
Errors error = Errors.forCode(partition.errorCode());
137167
if (!offsetTimestampsByPartition.containsKey(topicPartition)) {
138168
log.warn("ListOffsets response includes unknown topic partition {}", topicPartition);

clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.KafkaFuture;
2020
import org.apache.kafka.common.TopicPartition;
21+
import org.apache.kafka.common.Uuid;
2122
import org.apache.kafka.common.errors.InvalidTopicException;
2223
import org.apache.kafka.common.errors.TopicAuthorizationException;
2324
import org.apache.kafka.common.internals.KafkaFutureImpl;
@@ -163,9 +164,14 @@ public LookupResult<TopicPartition> handleResponse(
163164
MetadataResponse response = (MetadataResponse) abstractResponse;
164165
Map<TopicPartition, Throwable> failed = new HashMap<>();
165166
Map<TopicPartition, Integer> mapped = new HashMap<>();
167+
Map<String, Uuid> topicIdByName = new HashMap<>();
168+
Map<Uuid, String> topiccNameById = new HashMap<>();
166169

167170
for (MetadataResponseData.MetadataResponseTopic topicMetadata : response.data().topics()) {
168171
String topic = topicMetadata.name();
172+
Uuid topicId = topicMetadata.topicId();
173+
topicIdByName.put(topic, topicId);
174+
topiccNameById.put(topicId, topic);
169175
Errors topicError = Errors.forCode(topicMetadata.errorCode());
170176
if (topicError != Errors.NONE) {
171177
handleTopicError(topic, topicError, requestPartitions, failed);
@@ -196,7 +202,7 @@ public LookupResult<TopicPartition> handleResponse(
196202
}
197203
}
198204
}
199-
return new LookupResult<>(failed, mapped);
205+
return new LookupResult<>(failed, mapped, topicIdByName, topiccNameById);
200206
}
201207

202208
/**

clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public final class ListOffsetsHandlerTest {
7979
@Test
8080
public void testBuildRequestSimple() {
8181
ListOffsetsHandler handler =
82-
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, defaultApiTimeoutMs);
82+
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext,
83+
defaultApiTimeoutMs, new HashMap<>(), new HashMap<>());
8384
ListOffsetsRequest request = handler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1)).build();
8485
List<ListOffsetsTopic> topics = request.topics();
8586
assertEquals(1, topics.size());
@@ -96,7 +97,8 @@ public void testBuildRequestSimple() {
9697
public void testBuildRequestMultipleTopicsWithReadCommitted() {
9798
ListOffsetsHandler handler =
9899
new ListOffsetsHandler(
99-
offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext, defaultApiTimeoutMs);
100+
offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext,
101+
defaultApiTimeoutMs, new HashMap<>(), new HashMap<>());
100102
ListOffsetsRequest request =
101103
handler.buildBatchedRequest(node.id(), offsetTimestampsByPartition.keySet()).build();
102104
List<ListOffsetsTopic> topics = request.topics();
@@ -117,14 +119,16 @@ public void testBuildRequestMultipleTopicsWithReadCommitted() {
117119
@Test
118120
public void testBuildRequestAllowedVersions() {
119121
ListOffsetsHandler defaultOptionsHandler =
120-
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, defaultApiTimeoutMs);
122+
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext,
123+
defaultApiTimeoutMs, new HashMap<>(), new HashMap<>());
121124
ListOffsetsRequest.Builder builder =
122125
defaultOptionsHandler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1, t1p0));
123126
assertEquals(1, builder.oldestAllowedVersion());
124127

125128
ListOffsetsHandler readCommittedHandler =
126129
new ListOffsetsHandler(
127-
offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext, defaultApiTimeoutMs);
130+
offsetTimestampsByPartition, new ListOffsetsOptions(IsolationLevel.READ_COMMITTED), logContext,
131+
defaultApiTimeoutMs, new HashMap<>(), new HashMap<>());
128132
builder = readCommittedHandler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1, t1p0));
129133
assertEquals(2, builder.oldestAllowedVersion());
130134

@@ -224,7 +228,8 @@ public void testHandleResponseUnsupportedVersion() {
224228
maxTimestampPartitions.put(t1p1, OffsetSpec.maxTimestamp());
225229

226230
ListOffsetsHandler handler =
227-
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, defaultApiTimeoutMs);
231+
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext,
232+
defaultApiTimeoutMs, new HashMap<>(), new HashMap<>());
228233

229234
final Map<TopicPartition, Long> nonMaxTimestampPartitions = new HashMap<>(offsetTimestampsByPartition);
230235
maxTimestampPartitions.forEach((k, v) -> nonMaxTimestampPartitions.remove(k));
@@ -256,7 +261,8 @@ public void testHandleResponseUnsupportedVersion() {
256261
public void testBuildRequestWithDefaultApiTimeoutMs() {
257262
ListOffsetsOptions options = new ListOffsetsOptions();
258263
ListOffsetsHandler handler =
259-
new ListOffsetsHandler(offsetTimestampsByPartition, options, logContext, defaultApiTimeoutMs);
264+
new ListOffsetsHandler(offsetTimestampsByPartition, options, logContext,
265+
defaultApiTimeoutMs, new HashMap<>(), new HashMap<>());
260266
ListOffsetsRequest request = handler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1)).build();
261267
assertEquals(defaultApiTimeoutMs, request.timeoutMs());
262268
}
@@ -266,7 +272,8 @@ public void testBuildRequestWithTimeoutMs() {
266272
Integer timeoutMs = 200;
267273
ListOffsetsOptions options = new ListOffsetsOptions().timeoutMs(timeoutMs);
268274
ListOffsetsHandler handler =
269-
new ListOffsetsHandler(offsetTimestampsByPartition, options, logContext, defaultApiTimeoutMs);
275+
new ListOffsetsHandler(offsetTimestampsByPartition, options, logContext,
276+
defaultApiTimeoutMs, new HashMap<>(), new HashMap<>());
270277
ListOffsetsRequest request = handler.buildBatchedRequest(node.id(), Set.of(t0p0, t0p1)).build();
271278
assertEquals(timeoutMs, request.timeoutMs());
272279
}
@@ -307,7 +314,8 @@ private static ListOffsetsResponse createResponse(
307314

308315
private ApiResult<TopicPartition, ListOffsetsResultInfo> handleResponse(ListOffsetsResponse response) {
309316
ListOffsetsHandler handler =
310-
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext, defaultApiTimeoutMs);
317+
new ListOffsetsHandler(offsetTimestampsByPartition, new ListOffsetsOptions(), logContext,
318+
defaultApiTimeoutMs, new HashMap<>(), new HashMap<>());
311319
return handler.handleResponse(node, offsetTimestampsByPartition.keySet(), response);
312320
}
313321

0 commit comments

Comments
 (0)