Skip to content

Commit 31020ac

Browse files
committed
fix delay Remoteoffset without topicId
1 parent 4b69570 commit 31020ac

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,13 @@ public void onExpiration() {
100100
@Override
101101
public void onComplete() {
102102
Map<String, ListOffsetsResponseData.ListOffsetsTopicResponse> groupedByTopic = new HashMap<>();
103-
statusByPartition.forEach((tp, status) -> {
104-
ListOffsetsResponseData.ListOffsetsTopicResponse response = groupedByTopic.computeIfAbsent(tp.topic(), k ->
105-
new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(tp.topic()));
103+
statusByPartition.forEach((topicIdPartition, status) -> {
104+
ListOffsetsResponseData.ListOffsetsTopicResponse response = groupedByTopic.computeIfAbsent(
105+
topicIdPartition.topic(),
106+
k -> new ListOffsetsResponseData.ListOffsetsTopicResponse()
107+
.setName(topicIdPartition.topic())
108+
.setTopicId(topicIdPartition.topicId())
109+
);
106110
status.responseOpt().ifPresent(res -> response.partitions().add(res));
107111
});
108112
responseCallback.accept(groupedByTopic.values());

0 commit comments

Comments
 (0)