Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
Expand Down Expand Up @@ -99,10 +99,21 @@ public class ShareCompletedFetch {
}

private List<OffsetAndDeliveryCount> buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> partitionAcquiredRecords) {
List<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<>();
// Setting the size of the array to the size of the first batch of acquired records. In case there is only 1 batch acquired, resizing would not happen.
int initialListSize = !partitionAcquiredRecords.isEmpty() ? (int) (partitionAcquiredRecords.get(0).lastOffset() -
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where the partitionAcquiredRecords is empty, we can just make an empty list and return directly. We don't need to make the HashSet only to discard it unused because the loop will not have any iterations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense, I have updated the code now.

partitionAcquiredRecords.get(0).firstOffset() + 1) : 0;
List<OffsetAndDeliveryCount> acquiredRecordList = new ArrayList<>(initialListSize);

// Set to find duplicates in case of overlapping acquired records
Set<Long> offsets = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could change the partitionAcquiredRecords into a LinkedHashMap or similar to combine the duplicate checking with the ordered iteration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look into making the acquiredRecordsList(LinkedList<OffsetAndDeliveryCount>) into a LinkedHashMap This change would actually have a bit of a code change around listIterator, we might have to use map.entrySet().iterator() for rewinding to the start of the list.
And as we are doing sequential operations and not key based, probably better to keep it as a list?
I have changed it to ArrayList instead of a LinkedList though as it would give better iteration performance for build once and iterate use cases.

partitionAcquiredRecords.forEach(acquiredRecords -> {
for (long offset = acquiredRecords.firstOffset(); offset <= acquiredRecords.lastOffset(); offset++) {
acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount()));
if (!offsets.add(offset)) {
log.error("Duplicate acquired record offset {} found in share fetch response for partition {}. " +
"This indicates a broker processing issue.", offset, partition.topicPartition());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, are there any known issues that lead to duplicate offsets?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there was a broker side issue when SharePartition was at capacity - https://issues.apache.org/jira/browse/KAFKA-19808. Due to this, we were getting duplicate offsets (with different delivery counts) in the ShareFetchResponse.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no current known issues, but there was previously an issue in the broker and adding logging would have made it quicker to get to the bottom of it.

} else {
acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount()));
}
}
});
return acquiredRecordList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShareCompletedFetchTest {
private static final String TOPIC_NAME = "test";
Expand Down Expand Up @@ -356,6 +357,63 @@ record = records.get(1);
assertEquals(0, records.size());
}

@Test
public void testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() {
int startingOffset = 0;
int numRecords = 20; // Records for 0-19

// Create overlapping acquired records: [0-9] and [5-14]
// Offsets 5-9 will be duplicates
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new ArrayList<>();
acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(0L)
.setLastOffset(9L)
.setDeliveryCount((short) 1));
acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
.setFirstOffset(5L)
.setLastOffset(14L)
.setDeliveryCount((short) 2));

ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData()
.setRecords(newRecords(startingOffset, numRecords))
.setAcquiredRecords(acquiredRecords);

ShareCompletedFetch completedFetch = newShareCompletedFetch(partitionData);

Deserializers<String, String> deserializers = newStringDeserializers();

// Fetch records and verify that only 15 unique records are returned (0-14)
ShareInFlightBatch<String, String> batch = completedFetch.fetchRecords(deserializers, 20, true);
List<ConsumerRecord<String, String>> records = batch.getInFlightRecords();

// Should get 15 unique records: 0-9 from first range (with deliveryCount=1)
// and 10-14 from second range (with deliveryCount=2)
assertEquals(15, records.size());

// Verify first occurrence (offset 5 should have deliveryCount=1 from first range)
ConsumerRecord<String, String> record5 = records.stream()
.filter(r -> r.offset() == 5L)
.findFirst()
.orElse(null);
assertNotNull(record5);
assertEquals(Optional.of((short) 1), record5.deliveryCount());

// Verify offset 10 has deliveryCount=2 from second range
ConsumerRecord<String, String> record10 = records.stream()
.filter(r -> r.offset() == 10L)
.findFirst()
.orElse(null);
assertNotNull(record10);
assertEquals(Optional.of((short) 2), record10.deliveryCount());

// Verify all offsets are unique
Set<Long> offsetSet = new HashSet<>();
for (ConsumerRecord<String, String> record : records) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this covers the new behavior, since inFlightRecords already handles offset deduplication.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the logic around inFlightRecords ensures we do not send duplicate offsets to the application side, but the client does respond with a GAP acknowledgement to the broker for any duplicate offset.

Without deduplication, when the offset is encountered second time,lastRecord.offset > nextAcquired.offset, (as nextAcquired will be an older offset) will be true, so the client acknowledges these offsets as GAPs which is kind of hiding the main issue.
As the broker is already in a bad state(duplication should never happen), we thought of logging an error and ignoring any duplicates on the client.

assertTrue(offsetSet.add(record.offset()),
"Duplicate offset found in results: " + record.offset());
}
}

private ShareCompletedFetch newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) {
LogContext logContext = new LogContext();
ShareFetchMetricsRegistry shareFetchMetricsRegistry = new ShareFetchMetricsRegistry();
Expand Down