Skip to content

Commit

Permalink
Change Kafka Buffer defaults for fetch.max.wait.ms, fetch.min.bytes, …
Browse files Browse the repository at this point in the history
…partition.assignment.strategy, close consumer on shutdown (#5373)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Jan 30, 2025
1 parent 8f384dc commit 49c6a2b
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig
private static final Long DEFAULT_RETENTION_PERIOD = 604800000L;
static final boolean DEFAULT_AUTO_COMMIT = false;
static final ByteCount DEFAULT_FETCH_MAX_BYTES = ByteCount.parse("50mb");
static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(500);
static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("1b");
static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(1000);
static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("2kb");
static final ByteCount DEFAULT_MAX_PARTITION_FETCH_BYTES = ByteCount.parse("1mb");
static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class KafkaBuffer extends AbstractBuffer<Record<Event>> {
private final AbstractBuffer<Record<Event>> innerBuffer;
private final ExecutorService executorService;
private final Duration drainTimeout;

private final List<KafkaCustomConsumer> consumers;
private AtomicBoolean shutdownInProgress;
private ByteDecoder byteDecoder;

Expand All @@ -83,7 +85,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
this.shutdownInProgress = new AtomicBoolean(false);
final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName());
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
this.consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker);
this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId()));
this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE));
Expand Down Expand Up @@ -233,6 +235,9 @@ public void shutdown() {
executorService.shutdownNow();
}

LOG.info("Closing {} consumers", consumers.size());
consumers.forEach(KafkaCustomConsumer::closeConsumer);

innerBuffer.shutdown();
} finally {
resetMdc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -167,6 +168,7 @@ public static void setConsumerTopicProperties(final Properties properties, final
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int)topicConfig.getFetchMaxBytes());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes());
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
}

private void setSchemaRegistryProperties(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topicConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Objects;
import java.util.Map;
import java.util.HashMap;

public class KafkaTopicConsumerMetrics {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConsumerMetrics.class);
static final String NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS = "numberOfPositiveAcknowledgements";
static final String NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS = "numberOfNegativeAcknowledgements";
static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse";
Expand Down Expand Up @@ -82,7 +85,10 @@ private void initializeMetricNamesMap(final boolean topicNameInMetrics) {
double max = 0.0;
for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
Map<String, Double> consumerMetrics = entry.getValue();
synchronized(consumerMetrics) {
synchronized (consumerMetrics) {
if (consumerMetrics.get(metricName) == null) {
LOG.debug("No consumer metric for recordsLagMax found");
}
max = Math.max(max, consumerMetrics.get(metricName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ public void testShutdown_Successful() throws InterruptedException {
kafkaBuffer.shutdown();
verify(executorService).shutdown();
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
verify(consumer).closeConsumer();
}

@Test
Expand Down

0 comments on commit 49c6a2b

Please sign in to comment.