-
Couldn't load subscription status.
- Fork 14.8k
KAFKA-19789: Log an error when we get duplicate acquired offsets in ShareFetchResponse. #20752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
5e4ebde
07f9502
a729d1f
8db08e5
3c4a21b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,9 +41,9 @@ | |
|
|
||
| import java.io.Closeable; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.ArrayList; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.LinkedList; | ||
| import java.util.List; | ||
| import java.util.ListIterator; | ||
| import java.util.Optional; | ||
|
|
@@ -99,10 +99,17 @@ public class ShareCompletedFetch { | |
| } | ||
|
|
||
| private List<OffsetAndDeliveryCount> buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> partitionAcquiredRecords) { | ||
| List<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<>(); | ||
| List<OffsetAndDeliveryCount> acquiredRecordList = new ArrayList<>(); | ||
| // Set to find duplicates in case of overlapping acquired records | ||
| Set<Long> offsets = new HashSet<>(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if you could change the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had a look into making the |
||
| partitionAcquiredRecords.forEach(acquiredRecords -> { | ||
| for (long offset = acquiredRecords.firstOffset(); offset <= acquiredRecords.lastOffset(); offset++) { | ||
| acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount())); | ||
| if (!offsets.add(offset)) { | ||
| log.error("Duplicate acquired record offset {} found in share fetch response for partition {}. " + | ||
| "This indicates a broker processing issue.", offset, partition.topicPartition()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, are there any known issues that lead to duplicate offsets? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, there was a broker side issue when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are no current known issues, but there was previously an issue in the broker and adding logging would have made it quicker to get to the bottom of it. |
||
| } else { | ||
| acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount())); | ||
| } | ||
| } | ||
| }); | ||
| return acquiredRecordList; | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -60,6 +60,7 @@ | |||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||||
| import static org.junit.jupiter.api.Assertions.assertNull; | ||||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||||
|
|
||||
| public class ShareCompletedFetchTest { | ||||
| private static final String TOPIC_NAME = "test"; | ||||
|
|
@@ -356,6 +357,63 @@ record = records.get(1); | |||
| assertEquals(0, records.size()); | ||||
| } | ||||
|
|
||||
| @Test | ||||
| public void testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() { | ||||
| int startingOffset = 0; | ||||
| int numRecords = 20; // Records for 0-19 | ||||
|
|
||||
| // Create overlapping acquired records: [0-9] and [5-14] | ||||
| // Offsets 5-9 will be duplicates | ||||
| List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new ArrayList<>(); | ||||
| acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords() | ||||
| .setFirstOffset(0L) | ||||
| .setLastOffset(9L) | ||||
| .setDeliveryCount((short) 1)); | ||||
| acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords() | ||||
| .setFirstOffset(5L) | ||||
| .setLastOffset(14L) | ||||
| .setDeliveryCount((short) 2)); | ||||
|
|
||||
| ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData() | ||||
| .setRecords(newRecords(startingOffset, numRecords)) | ||||
| .setAcquiredRecords(acquiredRecords); | ||||
|
|
||||
| ShareCompletedFetch completedFetch = newShareCompletedFetch(partitionData); | ||||
|
|
||||
| Deserializers<String, String> deserializers = newStringDeserializers(); | ||||
|
|
||||
| // Fetch records and verify that only 15 unique records are returned (0-14) | ||||
| ShareInFlightBatch<String, String> batch = completedFetch.fetchRecords(deserializers, 20, true); | ||||
| List<ConsumerRecord<String, String>> records = batch.getInFlightRecords(); | ||||
|
|
||||
| // Should get 15 unique records: 0-9 from first range (with deliveryCount=1) | ||||
| // and 10-14 from second range (with deliveryCount=2) | ||||
| assertEquals(15, records.size()); | ||||
|
|
||||
| // Verify first occurrence (offset 5 should have deliveryCount=1 from first range) | ||||
| ConsumerRecord<String, String> record5 = records.stream() | ||||
| .filter(r -> r.offset() == 5L) | ||||
| .findFirst() | ||||
| .orElse(null); | ||||
| assertNotNull(record5); | ||||
| assertEquals(Optional.of((short) 1), record5.deliveryCount()); | ||||
|
|
||||
| // Verify offset 10 has deliveryCount=2 from second range | ||||
| ConsumerRecord<String, String> record10 = records.stream() | ||||
| .filter(r -> r.offset() == 10L) | ||||
| .findFirst() | ||||
| .orElse(null); | ||||
| assertNotNull(record10); | ||||
| assertEquals(Optional.of((short) 2), record10.deliveryCount()); | ||||
|
|
||||
| // Verify all offsets are unique | ||||
| Set<Long> offsetSet = new HashSet<>(); | ||||
| for (ConsumerRecord<String, String> record : records) { | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this covers the new behavior, since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes the logic around kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java Line 219 in 2c44448
Without deduplication, when the offset is encountered second time, |
||||
| assertTrue(offsetSet.add(record.offset()), | ||||
| "Duplicate offset found in results: " + record.offset()); | ||||
| } | ||||
| } | ||||
|
|
||||
| private ShareCompletedFetch newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) { | ||||
| LogContext logContext = new LogContext(); | ||||
| ShareFetchMetricsRegistry shareFetchMetricsRegistry = new ShareFetchMetricsRegistry(); | ||||
|
|
||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default, a new list will have space for 10 elements. Resizing is expensive. Maybe one optimisation would be to see how many offsets are in the first element in the
partitionAcquiredRecords, and using that number as the initial size of the list. In the case of only one batch of offsets, the list will be the correct size alrready. wdyt?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, if most of the times the response is gonna contain only 1 batch, we can avoid resizing. I have made the change. Thanks.