diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java index 2c337782dd415..95e40a3c826c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java @@ -41,9 +41,9 @@ import java.io.Closeable; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Optional; @@ -99,10 +99,23 @@ public class ShareCompletedFetch { } private List buildAcquiredRecordList(List partitionAcquiredRecords) { - List acquiredRecordList = new LinkedList<>(); + // Setting the size of the array to the size of the first batch of acquired records. In case there is only 1 batch acquired, resizing would not happen. + if (partitionAcquiredRecords.isEmpty()) { + return List.of(); + } + int initialListSize = (int) (partitionAcquiredRecords.get(0).lastOffset() - partitionAcquiredRecords.get(0).firstOffset() + 1); + List acquiredRecordList = new ArrayList<>(initialListSize); + + // Set to find duplicates in case of overlapping acquired records + Set offsets = new HashSet<>(); partitionAcquiredRecords.forEach(acquiredRecords -> { for (long offset = acquiredRecords.firstOffset(); offset <= acquiredRecords.lastOffset(); offset++) { - acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount())); + if (!offsets.add(offset)) { + log.error("Duplicate acquired record offset {} found in share fetch response for partition {}. " + + "This indicates a broker processing issue.", offset, partition.topicPartition()); + } else { + acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount())); + } } }); return acquiredRecordList; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java index a1814fd935c9c..95f1696629235 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java @@ -60,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ShareCompletedFetchTest { private static final String TOPIC_NAME = "test"; @@ -356,6 +357,63 @@ record = records.get(1); assertEquals(0, records.size()); } + @Test + public void testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() { + int startingOffset = 0; + int numRecords = 20; // Records for 0-19 + + // Create overlapping acquired records: [0-9] and [5-14] + // Offsets 5-9 will be duplicates + List acquiredRecords = new ArrayList<>(); + acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0L) + .setLastOffset(9L) + .setDeliveryCount((short) 1)); + acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(5L) + .setLastOffset(14L) + .setDeliveryCount((short) 2)); + + ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData() + .setRecords(newRecords(startingOffset, numRecords)) + .setAcquiredRecords(acquiredRecords); + + ShareCompletedFetch completedFetch = newShareCompletedFetch(partitionData); + + Deserializers deserializers = newStringDeserializers(); + + // Fetch records and verify that only 15 unique records are returned (0-14) + ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, 20, true); + List> records = batch.getInFlightRecords(); + + // Should get 15 unique records: 0-9 from first range (with deliveryCount=1) + // and 10-14 from second range (with deliveryCount=2) + assertEquals(15, records.size()); + + // Verify first occurrence (offset 5 should have deliveryCount=1 from first range) + ConsumerRecord record5 = records.stream() + .filter(r -> r.offset() == 5L) + .findFirst() + .orElse(null); + assertNotNull(record5); + assertEquals(Optional.of((short) 1), record5.deliveryCount()); + + // Verify offset 10 has deliveryCount=2 from second range + ConsumerRecord record10 = records.stream() + .filter(r -> r.offset() == 10L) + .findFirst() + .orElse(null); + assertNotNull(record10); + assertEquals(Optional.of((short) 2), record10.deliveryCount()); + + // Verify all offsets are unique + Set offsetSet = new HashSet<>(); + for (ConsumerRecord record : records) { + assertTrue(offsetSet.add(record.offset()), + "Duplicate offset found in results: " + record.offset()); + } + } + private ShareCompletedFetch newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) { LogContext logContext = new LogContext(); ShareFetchMetricsRegistry shareFetchMetricsRegistry = new ShareFetchMetricsRegistry();