From 15db13a8339f45a1e9571979e9095e69db78005d Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 28 Jan 2025 17:47:27 -0600 Subject: [PATCH] Add debug logs for Kafka consumer Signed-off-by: Taylor Gray --- .../dataprepper/buffer/common/BufferAccumulator.java | 3 +++ .../plugins/kafka/consumer/KafkaCustomConsumer.java | 11 +++++++++++ 2 files changed, 14 insertions(+) diff --git a/data-prepper-plugins/buffer-common/src/main/java/org/opensearch/dataprepper/buffer/common/BufferAccumulator.java b/data-prepper-plugins/buffer-common/src/main/java/org/opensearch/dataprepper/buffer/common/BufferAccumulator.java index eeaedf4ec1..69960f7dd7 100644 --- a/data-prepper-plugins/buffer-common/src/main/java/org/opensearch/dataprepper/buffer/common/BufferAccumulator.java +++ b/data-prepper-plugins/buffer-common/src/main/java/org/opensearch/dataprepper/buffer/common/BufferAccumulator.java @@ -68,6 +68,7 @@ public void add(final T record) throws Exception { public void flush() throws Exception { try { + LOG.debug("Flushing buffer accumulator"); flushAccumulatedToBuffer(); } catch (final TimeoutException timeoutException) { flushWithBackoff(); @@ -80,11 +81,13 @@ private boolean flushWithBackoff() throws Exception{ boolean flushedSuccessfully; for (int retryCount = 0; retryCount < MAX_FLUSH_RETRIES_ON_IO_EXCEPTION; retryCount++) { + LOG.debug("Retrying buffer flush on retry count {}", retryCount); final ScheduledFuture flushBufferFuture = scheduledExecutorService.schedule(() -> { try { flushAccumulatedToBuffer(); return true; } catch (final TimeoutException e) { + LOG.debug("Timed out retrying buffer accumulator"); return false; } }, nextDelay, TimeUnit.MILLISECONDS); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index a84f800d8d..a07d2f5130 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -204,6 +204,7 @@ ConsumerRecords doPoll() throws Exception { void consumeRecords() throws Exception { try { ConsumerRecords records = doPoll(); + LOG.debug("Consumed records with count {}", records.count()); if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) { Map offsets = new HashMap<>(); AcknowledgementSet acknowledgementSet = null; @@ -367,6 +368,7 @@ public void run() { boolean retryingAfterException = false; while (!shutdownInProgress.get()) { + LOG.debug("Still running Kafka consumer in start of loop"); try { if (retryingAfterException) { LOG.debug("Pause consuming from Kafka topic due a previous exception."); @@ -382,12 +384,15 @@ public void run() { paused = false; consumer.resume(consumer.assignment()); } + LOG.debug("Still running Kafka consumer preparing to commit offsets and consume records"); synchronized(this) { commitOffsets(false); resetOffsets(); } consumeRecords(); + LOG.debug("Exited consume records"); topicMetrics.update(consumer); + LOG.debug("Updated consumer metrics"); retryingAfterException = false; } catch (Exception exp) { LOG.error("Error while reading the records from the topic {}. Retry after 10 seconds", topicName, exp); @@ -475,6 +480,7 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re } long numRetries = 0; while (true) { + LOG.debug("In while loop for processing records, paused = {}", paused); try { if (numRetries == 0) { bufferAccumulator.add(record); @@ -485,7 +491,9 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re } catch (Exception e) { if (!paused && numRetries++ > maxRetriesOnException) { paused = true; + LOG.debug("Preparing to call pause"); consumer.pause(consumer.assignment()); + LOG.debug("Pause was called"); } if (e instanceof SizeOverflowException) { topicMetrics.getNumberOfBufferSizeOverflows().increment(); @@ -493,8 +501,10 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re LOG.debug("Error while adding record to buffer, retrying ", e); } try { + LOG.debug("Sleeping due to exception"); Thread.sleep(RETRY_ON_EXCEPTION_SLEEP_MS); if (paused) { + LOG.debug("Calling doPoll()"); ConsumerRecords records = doPoll(); if (records.count() > 0) { LOG.warn("Unexpected records received while the consumer is paused. Resetting the partitions to retry from last read pointer"); @@ -509,6 +519,7 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re } if (paused) { + LOG.debug("Resuming consumption"); consumer.resume(consumer.assignment()); paused = false; }