Skip to content

Commit 667cc6c

Browse files
committed
init commit
1 parent 5cc4c87 commit 667cc6c

File tree

20 files changed

+162
-80
lines changed

20 files changed

+162
-80
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2993,6 +2993,10 @@ public void testRenewAcknowledgementOnCommitSync() {
29932993
shareConsumer.subscribe(List.of(tp.topic()));
29942994
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 10);
29952995
assertEquals(10, records.count());
2996+
assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs());
2997+
2998+
// The updated acquisition lock timeout is only applied when the next poll is called.
2999+
alterShareRecordLockDurationMs("group1", 25000);
29963000

29973001
int count = 0;
29983002
Map<TopicIdPartition, Optional<KafkaException>> result;
@@ -3004,13 +3008,15 @@ public void testRenewAcknowledgementOnCommitSync() {
30043008
}
30053009
result = shareConsumer.commitSync();
30063010
assertEquals(1, result.size());
3011+
assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs());
30073012
assertEquals(Optional.empty(), result.get(new TopicIdPartition(tpId, tp.partition(), tp.topic())));
30083013
count++;
30093014
}
30103015

30113016
// Get the rest of all 5 records.
30123017
records = waitedPoll(shareConsumer, 2500L, 5);
30133018
assertEquals(5, records.count());
3019+
assertEquals(Optional.of(25000), shareConsumer.acquisitionLockTimeoutMs());
30143020
for (ConsumerRecord<byte[], byte[]> record : records) {
30153021
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
30163022
}
@@ -3441,6 +3447,19 @@ private void alterShareIsolationLevel(String groupId, String newValue) {
34413447
}
34423448
}
34433449

3450+
private void alterShareRecordLockDurationMs(String groupId, int newValue) {
3451+
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
3452+
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>();
3453+
alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(
3454+
GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, Integer.toString(newValue)), AlterConfigOp.OpType.SET)));
3455+
AlterConfigsOptions alterOptions = new AlterConfigsOptions();
3456+
try (Admin adminClient = createAdminClient()) {
3457+
assertDoesNotThrow(() -> adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
3458+
.all()
3459+
.get(60, TimeUnit.SECONDS), "Failed to alter configs");
3460+
}
3461+
}
3462+
34443463
/**
34453464
* Test utility which encapsulates a {@link ShareConsumer} whose record processing
34463465
* behavior can be supplied as a function argument.

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4262,7 +4262,7 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
42624262
ListOffsetsHandler.newFuture(topicPartitionOffsets.keySet(), partitionLeaderCache);
42634263
Map<TopicPartition, Long> offsetQueriesByPartition = topicPartitionOffsets.entrySet().stream()
42644264
.collect(Collectors.toMap(Map.Entry::getKey, e -> getOffsetFromSpec(e.getValue())));
4265-
ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, options, logContext, defaultApiTimeoutMs);
4265+
ListOffsetsHandler handler = new ListOffsetsHandler(offsetQueriesByPartition, metadataManager.cluster(), options, logContext, defaultApiTimeoutMs);
42664266
invokeDriver(handler, future, options.timeoutMs);
42674267
return new ListOffsetsResult(future.all());
42684268
}

clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ public void update(Cluster cluster, long now) {
318318
}
319319
}
320320

321+
public Cluster cluster() {
322+
return cluster;
323+
}
324+
321325
public void initiateRebootstrap() {
322326
this.metadataAttemptStartMs = Optional.of(0L);
323327
}

clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import org.apache.kafka.clients.admin.ListOffsetsOptions;
2020
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
2121
import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
22+
import org.apache.kafka.common.Cluster;
2223
import org.apache.kafka.common.Node;
2324
import org.apache.kafka.common.TopicPartition;
25+
import org.apache.kafka.common.Uuid;
2426
import org.apache.kafka.common.errors.ApiException;
2527
import org.apache.kafka.common.errors.RetriableException;
2628
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -43,25 +45,29 @@
4345
import java.util.HashSet;
4446
import java.util.List;
4547
import java.util.Map;
48+
import java.util.Objects;
4649
import java.util.Optional;
4750
import java.util.Set;
4851
import java.util.stream.Collectors;
4952

5053
public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffsetsResultInfo> {
5154

5255
private final Map<TopicPartition, Long> offsetTimestampsByPartition;
56+
private final Cluster cluster;
5357
private final ListOffsetsOptions options;
5458
private final Logger log;
5559
private final AdminApiLookupStrategy<TopicPartition> lookupStrategy;
5660
private final int defaultApiTimeoutMs;
5761

5862
public ListOffsetsHandler(
5963
Map<TopicPartition, Long> offsetTimestampsByPartition,
64+
Cluster cluster,
6065
ListOffsetsOptions options,
6166
LogContext logContext,
6267
int defaultApiTimeoutMs
6368
) {
6469
this.offsetTimestampsByPartition = offsetTimestampsByPartition;
70+
this.cluster = cluster;
6571
this.options = options;
6672
this.log = logContext.logger(ListOffsetsHandler.class);
6773
this.lookupStrategy = new PartitionLeaderStrategy(logContext, false);
@@ -82,7 +88,7 @@ public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {
8288
ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition> keys) {
8389
Map<String, ListOffsetsTopic> topicsByName = CollectionUtils.groupPartitionsByTopic(
8490
keys,
85-
topicName -> new ListOffsetsTopic().setName(topicName),
91+
topicName -> new ListOffsetsTopic().setName(topicName).setTopicId(cluster.topicId(topicName)),
8692
(listOffsetsTopic, partitionId) -> {
8793
TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId);
8894
long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
@@ -91,6 +97,12 @@ ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition>
9197
.setPartitionIndex(partitionId)
9298
.setTimestamp(offsetTimestamp));
9399
});
100+
101+
boolean supportsTopicIds = topicsByName.values().stream()
102+
.filter(Objects::nonNull)
103+
.map(ListOffsetsTopic::topicId)
104+
.anyMatch(topicId -> topicId != null && !topicId.equals(Uuid.ZERO_UUID));
105+
94106
boolean supportsMaxTimestamp = keys
95107
.stream()
96108
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP);
@@ -110,6 +122,7 @@ ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition>
110122
int timeoutMs = options.timeoutMs() != null ? options.timeoutMs() : defaultApiTimeoutMs;
111123
return ListOffsetsRequest.Builder.forConsumer(true,
112124
options.isolationLevel(),
125+
supportsTopicIds,
113126
supportsMaxTimestamp,
114127
requireEarliestLocalTimestamp,
115128
requireTieredStorageTimestamp,

0 commit comments

Comments
 (0)