Skip to content

Commit 9fb3569

Browse files
authored
Merge pull request #971: [direct-io-kafka] fix bounded read
2 parents 4f06f05 + d61ed51 commit 9fb3569

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaLogReader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,7 @@ private void submitConsumerWithObserver(
495495
handleRebalanceInOffsetCommit(kafka, listener);
496496
}
497497
rethrowErrorIfPresent(name, error);
498+
log.debug("Current endOffsets {}, polledOffsets {}", endOffsets, polledOffsets);
498499
terminateIfConsumed(stopAtCurrent, kafka, endOffsets, polledOffsets, completed);
499500

500501
progressWatermarkOnEmptyPartitions(
@@ -581,6 +582,7 @@ private static void notifyAssignedPartitions(
581582
KafkaConsumer<?, ?> kafka, ConsumerRebalanceListener listener) {
582583

583584
Set<TopicPartition> assignment = kafka.assignment();
585+
log.debug("Assignment before notification is {}", assignment);
584586
if (!assignment.isEmpty()) {
585587
listener.onPartitionsRevoked(assignment);
586588
listener.onPartitionsAssigned(assignment);
@@ -937,7 +939,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> parts) {
937939
? getCommittedTopicOffsets(currentlyAssigned, c)
938940
: getCurrentTopicOffsets(currentlyAssigned, c);
939941
newOffsets.stream()
940-
.filter(o -> o.getOffset() > 0)
942+
.filter(o -> o.getOffset() >= 0)
941943
.forEach(
942944
o ->
943945
polledOffsets.put(

0 commit comments

Comments
 (0)