Skip to content

Commit

Permalink
Add debug logs for Kafka consumer (#5369)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Jan 29, 2025
1 parent 0423cd1 commit 8152c57
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void add(final T record) throws Exception {

public void flush() throws Exception {
try {
LOG.debug("Flushing buffer accumulator");
flushAccumulatedToBuffer();
} catch (final TimeoutException timeoutException) {
flushWithBackoff();
Expand All @@ -80,11 +81,13 @@ private boolean flushWithBackoff() throws Exception{
boolean flushedSuccessfully;

for (int retryCount = 0; retryCount < MAX_FLUSH_RETRIES_ON_IO_EXCEPTION; retryCount++) {
LOG.debug("Retrying buffer flush on retry count {}", retryCount);
final ScheduledFuture<Boolean> flushBufferFuture = scheduledExecutorService.schedule(() -> {
try {
flushAccumulatedToBuffer();
return true;
} catch (final TimeoutException e) {
LOG.debug("Timed out retrying buffer accumulator");
return false;
}
}, nextDelay, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ <T> ConsumerRecords<String, T> doPoll() throws Exception {
<T> void consumeRecords() throws Exception {
try {
ConsumerRecords<String, T> records = doPoll();
LOG.debug("Consumed records with count {}", records.count());
if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) {
Map<TopicPartition, CommitOffsetRange> offsets = new HashMap<>();
AcknowledgementSet acknowledgementSet = null;
Expand Down Expand Up @@ -367,6 +368,7 @@ public void run() {

boolean retryingAfterException = false;
while (!shutdownInProgress.get()) {
LOG.debug("Still running Kafka consumer in start of loop");
try {
if (retryingAfterException) {
LOG.debug("Pause consuming from Kafka topic due a previous exception.");
Expand All @@ -382,12 +384,15 @@ public void run() {
paused = false;
consumer.resume(consumer.assignment());
}
LOG.debug("Still running Kafka consumer preparing to commit offsets and consume records");
synchronized(this) {
commitOffsets(false);
resetOffsets();
}
consumeRecords();
LOG.debug("Exited consume records");
topicMetrics.update(consumer);
LOG.debug("Updated consumer metrics");
retryingAfterException = false;
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic {}. Retry after 10 seconds", topicName, exp);
Expand Down Expand Up @@ -475,6 +480,7 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re
}
long numRetries = 0;
while (true) {
LOG.debug("In while loop for processing records, paused = {}", paused);
try {
if (numRetries == 0) {
bufferAccumulator.add(record);
Expand All @@ -485,16 +491,20 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re
} catch (Exception e) {
if (!paused && numRetries++ > maxRetriesOnException) {
paused = true;
LOG.debug("Preparing to call pause");
consumer.pause(consumer.assignment());
LOG.debug("Pause was called");
}
if (e instanceof SizeOverflowException) {
topicMetrics.getNumberOfBufferSizeOverflows().increment();
} else {
LOG.debug("Error while adding record to buffer, retrying ", e);
}
try {
LOG.debug("Sleeping due to exception");
Thread.sleep(RETRY_ON_EXCEPTION_SLEEP_MS);
if (paused) {
LOG.debug("Calling doPoll()");
ConsumerRecords<String, ?> records = doPoll();
if (records.count() > 0) {
LOG.warn("Unexpected records received while the consumer is paused. Resetting the partitions to retry from last read pointer");
Expand All @@ -509,6 +519,7 @@ private void processRecord(final AcknowledgementSet acknowledgementSet, final Re
}

if (paused) {
LOG.debug("Resuming consumption");
consumer.resume(consumer.assignment());
paused = false;
}
Expand Down

0 comments on commit 8152c57

Please sign in to comment.