From 49c6a2bcd94eb924cb6f3a782f09f5a07d62ea18 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 30 Jan 2025 12:33:22 -0600 Subject: [PATCH] Change Kafka Buffer defaults for fetch.max.wait.ms, fetch.min.bytes, partition.assignment.strategy, close consumer on shutdown (#5373) Signed-off-by: Taylor Gray --- .../plugins/kafka/buffer/BufferTopicConfig.java | 4 ++-- .../dataprepper/plugins/kafka/buffer/KafkaBuffer.java | 7 ++++++- .../kafka/consumer/KafkaCustomConsumerFactory.java | 2 ++ .../plugins/kafka/util/KafkaTopicConsumerMetrics.java | 8 +++++++- .../dataprepper/plugins/kafka/buffer/KafkaBufferTest.java | 1 + 5 files changed, 18 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java index 56377c1f22..a97b68d0f3 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java @@ -26,8 +26,8 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig private static final Long DEFAULT_RETENTION_PERIOD = 604800000L; static final boolean DEFAULT_AUTO_COMMIT = false; static final ByteCount DEFAULT_FETCH_MAX_BYTES = ByteCount.parse("50mb"); - static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(500); - static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("1b"); + static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(1000); + static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("2kb"); static final ByteCount DEFAULT_MAX_PARTITION_FETCH_BYTES = ByteCount.parse("1mb"); static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); static final String DEFAULT_AUTO_OFFSET_RESET = "earliest"; diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index f8ec9c4d91..336f29fe0f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -64,6 +64,8 @@ public class KafkaBuffer extends AbstractBuffer> { private final AbstractBuffer> innerBuffer; private final ExecutorService executorService; private final Duration drainTimeout; + + private final List consumers; private AtomicBoolean shutdownInProgress; private ByteDecoder byteDecoder; @@ -83,7 +85,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName()); this.shutdownInProgress = new AtomicBoolean(false); final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName()); - final List consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), + this.consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker); this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId())); this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE)); @@ -233,6 +235,9 @@ public void shutdown() { executorService.shutdownNow(); } + LOG.info("Closing {} consumers", consumers.size()); + consumers.forEach(KafkaCustomConsumer::closeConsumer); + innerBuffer.shutdown(); } finally { resetMdc(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index 0d091b8af7..1981f6a60a 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -12,6 +12,7 @@ import io.confluent.kafka.serializers.KafkaJsonDeserializer; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.serialization.Deserializer; @@ -167,6 +168,7 @@ public static void setConsumerTopicProperties(final Properties properties, final properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int)topicConfig.getFetchMaxBytes()); properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes()); + properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName()); } private void setSchemaRegistryProperties(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topicConfig) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java index 1fd03f8aff..60f5d282bd 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java @@ -10,6 +10,8 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Instant; import java.util.Objects; @@ -17,6 +19,7 @@ import java.util.HashMap; public class KafkaTopicConsumerMetrics { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConsumerMetrics.class); static final String NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS = "numberOfPositiveAcknowledgements"; static final String NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS = "numberOfNegativeAcknowledgements"; static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse"; @@ -82,7 +85,10 @@ private void initializeMetricNamesMap(final boolean topicNameInMetrics) { double max = 0.0; for (Map.Entry> entry : metricValues.entrySet()) { Map consumerMetrics = entry.getValue(); - synchronized(consumerMetrics) { + synchronized (consumerMetrics) { + if (consumerMetrics.get(metricName) == null) { + LOG.debug("No consumer metric for recordsLagMax found"); + } max = Math.max(max, consumerMetrics.get(metricName)); } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 99f2afa76b..f7cae5e416 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -334,6 +334,7 @@ public void testShutdown_Successful() throws InterruptedException { kafkaBuffer.shutdown(); verify(executorService).shutdown(); verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS)); + verify(consumer).closeConsumer(); } @Test