@@ -402,6 +402,7 @@ private void submitConsumerWithObserver(
402402 new KafkaThroughputLimiter (maxBytesPerSec );
403403 final Map <TopicPartition , Long > polledOffsets = new HashMap <>();
404404 final Map <TopicPartition , Integer > emptyPolls = new HashMap <>();
405+ final Map <TopicPartition , Long > endOffsets = new HashMap <>();
405406
406407 handle .set (
407408 createObserveHandle (shutdown , seekOffsets , consumer , readyLatch , completedLatch ));
@@ -414,6 +415,7 @@ private void submitConsumerWithObserver(
414415 topicPartitionToId ,
415416 emptyPolls ,
416417 polledOffsets ,
418+ endOffsets ,
417419 watermarkEstimator );
418420
419421 try (KafkaConsumer <Object , Object > kafka =
@@ -423,12 +425,9 @@ private void submitConsumerWithObserver(
423425
424426 // we need to poll first to initialize kafka assignments and rebalance listener
425427 ConsumerRecords <Object , Object > poll ;
426- Map <TopicPartition , Long > endOffsets ;
427428
428429 do {
429430 poll = kafka .poll (pollDuration );
430- endOffsets = stopAtCurrent ? findNonEmptyEndOffsets (kafka ) : null ;
431-
432431 if (log .isDebugEnabled ()) {
433432 log .debug (
434433 "End offsets of current assignment {}: {}" , kafka .assignment (), endOffsets );
@@ -498,7 +497,12 @@ private void submitConsumerWithObserver(
498497 terminateIfConsumed (stopAtCurrent , kafka , endOffsets , polledOffsets , completed );
499498
500499 progressWatermarkOnEmptyPartitions (
501- consumer , emptyPolls , topicPartitionToId , watermarkEstimator );
500+ consumer ,
501+ emptyPolls ,
502+ endOffsets ,
503+ polledOffsets ,
504+ topicPartitionToId ,
505+ watermarkEstimator );
502506 throughputLimiter .sleepToLimitThroughput (bytesPolled , pollTimeMs );
503507 long startTime = System .currentTimeMillis ();
504508 poll = kafka .poll (pollDuration );
@@ -548,13 +552,19 @@ private void submitConsumerWithObserver(
548552 private void progressWatermarkOnEmptyPartitions (
549553 ElementConsumer <Object , Object > consumer ,
550554 Map <TopicPartition , Integer > emptyPolls ,
555+ Map <TopicPartition , Long > endOffsets ,
556+ Map <TopicPartition , Long > polledOffsets ,
551557 Map <TopicPartition , Integer > topicPartitionToId ,
552558 AtomicReference <PartitionedWatermarkEstimator > watermarkEstimator ) {
553559
554560 int partitions = emptyPolls .size ();
555561 List <TopicPartition > idlingPartitions =
556562 emptyPolls .entrySet ().stream ()
557563 .filter (e -> e .getValue () >= partitions )
564+ .filter (
565+ e ->
566+ MoreObjects .firstNonNull (polledOffsets .get (e .getKey ()), -1L ) + 1
567+ >= MoreObjects .firstNonNull (endOffsets .get (e .getKey ()), 0L ))
558568 .map (Entry ::getKey )
559569 .collect (Collectors .toList ());
560570
@@ -624,7 +634,7 @@ private void logConsumerWatermark(
624634
625635 if (log .isDebugEnabled ()) {
626636 log .debug (
627- "Current watermark of consumer name {} with offsets {} " + " on {} poll'd records is {}" ,
637+ "Current watermark of consumer name {} with offsets {} on {} poll'd records is {}" ,
628638 name ,
629639 offsets ,
630640 polledCount ,
@@ -723,14 +733,6 @@ public void waitUntilReady() throws InterruptedException {
723733 };
724734 }
725735
726- private Map <TopicPartition , Long > findNonEmptyEndOffsets (final KafkaConsumer <?, ?> kafka ) {
727- Set <TopicPartition > assignment = kafka .assignment ();
728- Map <TopicPartition , Long > beginning = kafka .beginningOffsets (assignment );
729- return kafka .endOffsets (assignment ).entrySet ().stream ()
730- .filter (entry -> beginning .get (entry .getKey ()) < entry .getValue ())
731- .collect (toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
732- }
733-
734736 private KafkaConsumer <Object , Object > createConsumer () {
735737 return createConsumer (UUID .randomUUID ().toString (), null , null , Position .NEWEST );
736738 }
@@ -883,6 +885,7 @@ private ConsumerRebalanceListener listener(
883885 Map <TopicPartition , Integer > topicPartitionToId ,
884886 Map <TopicPartition , Integer > emptyPolls ,
885887 Map <TopicPartition , Long > polledOffsets ,
888+ Map <TopicPartition , Long > endOffsets ,
886889 AtomicReference <PartitionedWatermarkEstimator > watermarkEstimator ) {
887890
888891 return new ConsumerRebalanceListener () {
@@ -901,6 +904,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> parts) {
901904 topicPartitionToId .clear ();
902905 AtomicInteger id = new AtomicInteger ();
903906 emptyPolls .clear ();
907+ polledOffsets .clear ();
908+ endOffsets .clear ();
909+ Optional .ofNullable (kafka .get ()).ifPresent (k -> endOffsets .putAll (k .endOffsets (parts )));
904910
905911 currentlyAssigned .forEach (p -> topicPartitionToId .put (p , id .getAndIncrement ()));
906912
@@ -921,13 +927,14 @@ public void onPartitionsAssigned(Collection<TopicPartition> parts) {
921927 name != null
922928 ? getCommittedTopicOffsets (currentlyAssigned , c )
923929 : getCurrentTopicOffsets (currentlyAssigned , c );
924- polledOffsets .clear ();
925- newOffsets .forEach (
926- o ->
927- polledOffsets .put (
928- new TopicPartition (
929- o .getPartition ().getTopic (), o .getPartition ().getPartition ()),
930- o .getOffset () - 1 ));
930+ newOffsets .stream ()
931+ .filter (o -> o .getOffset () > 0 )
932+ .forEach (
933+ o ->
934+ polledOffsets .put (
935+ new TopicPartition (
936+ o .getPartition ().getTopic (), o .getPartition ().getPartition ()),
937+ o .getOffset () - 1 ));
931938 consumer .onAssign (c , newOffsets );
932939 });
933940 }
0 commit comments