Skip to content

Commit

Permalink
Add Kafka Producer Metrics and Kafka Buffer Metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Nov 8, 2023
1 parent 97cd930 commit 0f1eb3f
Show file tree
Hide file tree
Showing 19 changed files with 472 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.mockito.Mock;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand Down Expand Up @@ -81,6 +82,9 @@ public class KafkaSinkAvroTypeIT {
@Mock
private PluginFactory pluginFactory;

@Mock
private PluginMetrics pluginMetrics;

private SinkContext sinkContext;

private String registryUrl;
Expand All @@ -104,7 +108,7 @@ public class KafkaSinkAvroTypeIT {


public KafkaSink createObjectUnderTest() {
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext, awsCredentialsSupplier);
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, pluginMetrics, evaluator, sinkContext, awsCredentialsSupplier);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.mockito.Mock;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand Down Expand Up @@ -73,6 +74,9 @@ public class KafkaSinkJsonTypeIT {
@Mock
private PluginFactory pluginFactory;

@Mock
private PluginMetrics pluginMetrics;

private SinkContext sinkContext;

@Mock
Expand All @@ -95,7 +99,7 @@ public class KafkaSinkJsonTypeIT {


public KafkaSink createObjectUnderTest() {
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext, awsCredentialsSupplier);
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, pluginMetrics, evaluator, sinkContext, awsCredentialsSupplier);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.mockito.Mock;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand Down Expand Up @@ -71,6 +72,9 @@ public class KafkaSinkPlainTextTypeIT {
@Mock
private PluginFactory pluginFactory;

@Mock
private PluginMetrics pluginMetrics;

private SinkContext sinkContext;

@Mock
Expand All @@ -93,7 +97,7 @@ public class KafkaSinkPlainTextTypeIT {


public KafkaSink createObjectUnderTest() {
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, evaluator, sinkContext, awsCredentialsSupplier);
return new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactory, pluginMetrics, evaluator, sinkContext, awsCredentialsSupplier);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class KafkaBuffer<T extends Record<?>> extends AbstractBuffer<T> {
static final long EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = 30L;
public static final int INNER_BUFFER_CAPACITY = 1000000;
public static final int INNER_BUFFER_BATCH_SIZE = 250000;
static final String WRITE = "Write";
static final String READ = "Read";
private final KafkaCustomProducer producer;
private final AbstractBuffer innerBuffer;
private final ExecutorService executorService;
Expand All @@ -58,12 +60,15 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
SerializationFactory serializationFactory = new SerializationFactory();
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier);
this.byteDecoder = byteDecoder;
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null);
final String metricPrefixName = kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName());
final PluginMetrics producerMetrics = PluginMetrics.fromNames(metricPrefixName + WRITE, pluginSetting.getPipelineName());
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null, producerMetrics, false);
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier);
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
this.shutdownInProgress = new AtomicBoolean(false);
final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName());
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, pluginMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress);
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false);
this.executorService = Executors.newFixedThreadPool(consumers.size());
consumers.forEach(this.executorService::submit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig {
@JsonProperty("drain_timeout")
private Duration drainTimeout = DEFAULT_DRAIN_TIMEOUT;

@JsonProperty("custom_metric_prefix")
private String customMetricPrefix;


public List<String> getBootstrapServers() {
if (Objects.nonNull(bootStrapServers)) {
Expand Down Expand Up @@ -131,4 +134,8 @@ public boolean getAcknowledgementsEnabled() {
public Duration getDrainTimeout() {
return drainTimeout;
}

public Optional<String> getCustomMetricPrefix() {
return Optional.ofNullable(customMetricPrefix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.LogRateLimiter;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
private List<Map<TopicPartition, CommitOffsetRange>> acknowledgedOffsets;
private final boolean acknowledgementsEnabled;
private final Duration acknowledgementsTimeout;
private final KafkaTopicMetrics topicMetrics;
private final KafkaTopicConsumerMetrics topicMetrics;
private long metricsUpdatedTime;
private final AtomicInteger numberOfAcksPending;
private long numRecordsCommitted = 0;
Expand All @@ -101,7 +101,7 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
final String schemaType,
final AcknowledgementSetManager acknowledgementSetManager,
final ByteDecoder byteDecoder,
KafkaTopicMetrics topicMetrics) {
final KafkaTopicConsumerMetrics topicMetrics) {
this.topicName = topicConfig.getName();
this.topicConfig = topicConfig;
this.shutdownInProgress = shutdownInProgress;
Expand All @@ -127,7 +127,7 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
this.errLogRateLimiter = new LogRateLimiter(2, System.currentTimeMillis());
}

KafkaTopicMetrics getTopicMetrics() {
KafkaTopicConsumerMetrics getTopicMetrics() {
return topicMetrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,10 +72,11 @@ public List<KafkaCustomConsumer> createConsumersForTopic(final KafkaConsumerConf
final Buffer<Record<Event>> buffer, final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager,
final ByteDecoder byteDecoder,
final AtomicBoolean shutdownInProgress) {
final AtomicBoolean shutdownInProgress,
final boolean topicNameInMetrics) {
Properties authProperties = new Properties();
KafkaSecurityConfigurer.setAuthProperties(authProperties, kafkaConsumerConfig, LOG);
KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics);
KafkaTopicConsumerMetrics topicMetrics = new KafkaTopicConsumerMetrics(topic.getName(), pluginMetrics, topicNameInMetrics);
Properties consumerProperties = getConsumerProperties(kafkaConsumerConfig, topic, authProperties);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -28,6 +28,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,7 +50,7 @@ public class KafkaCustomProducer<T> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomProducer.class);

private final Producer<String, T> producer;
private final KafkaProducer<String, T> producer;

private final KafkaProducerConfig kafkaProducerConfig;

Expand All @@ -69,28 +70,39 @@ public class KafkaCustomProducer<T> {

private final SchemaService schemaService;

private final KafkaTopicProducerMetrics topicMetrics;

public KafkaCustomProducer(final Producer producer,

public KafkaCustomProducer(final KafkaProducer producer,
final KafkaProducerConfig kafkaProducerConfig,
final DLQSink dlqSink,
final ExpressionEvaluator expressionEvaluator,
final String tagTargetKey
final String tagTargetKey,
final KafkaTopicProducerMetrics topicMetrics
) {
this.producer = producer;
this.kafkaProducerConfig = kafkaProducerConfig;
this.dlqSink = dlqSink;
bufferedEventHandles = new LinkedList<>();
this.bufferedEventHandles = new LinkedList<>();
this.expressionEvaluator = expressionEvaluator;
this.tagTargetKey = tagTargetKey;
this.topicName = ObjectUtils.isEmpty(kafkaProducerConfig.getTopic()) ? null : kafkaProducerConfig.getTopic().getName();
this.serdeFormat = ObjectUtils.isEmpty(kafkaProducerConfig.getSerdeFormat()) ? null : kafkaProducerConfig.getSerdeFormat();
schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build();
this.schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build();
this.topicMetrics = topicMetrics;
this.topicMetrics.register(this.producer);
}

KafkaTopicProducerMetrics getTopicMetrics() {
return topicMetrics;
}

public void produceRawData(final byte[] bytes, final String key) {
try {
send(topicName, key, bytes).get();
topicMetrics.update(producer);
} catch (Exception e) {
// TODO: Metrics
LOG.error("Error occurred while publishing {}", e.getMessage());
}
}
Expand All @@ -107,7 +119,9 @@ public void produceRecords(final Record<Event> record) {
} else {
publishPlaintextMessage(record, key);
}
topicMetrics.update(producer);
} catch (Exception e) {
// TODO: Metrics
LOG.error("Error occurred while publishing {}", e.getMessage());
releaseEventHandles(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.kafka.common.serialization.Serializer;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.sink.SinkContext;
Expand All @@ -27,6 +28,7 @@
import org.opensearch.dataprepper.plugins.kafka.service.TopicService;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.RestUtils;
import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer;
import org.slf4j.Logger;
Expand All @@ -46,7 +48,8 @@ public KafkaCustomProducerFactory(final SerializationFactory serializationFactor
}

public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig, final PluginFactory pluginFactory, final PluginSetting pluginSetting,
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext) {
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics,
final boolean topicNameInMetrics) {
AwsContext awsContext = new AwsContext(kafkaProducerConfig, awsCredentialsSupplier);
KeyFactory keyFactory = new KeyFactory(awsContext);
prepareTopicAndSchema(kafkaProducerConfig);
Expand All @@ -59,9 +62,10 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce
Serializer<Object> valueSerializer = (Serializer<Object>) serializationFactory.getSerializer(dataConfig);
final KafkaProducer<Object, Object> producer = new KafkaProducer<>(properties, keyDeserializer, valueSerializer);
final DLQSink dlqSink = new DLQSink(pluginFactory, kafkaProducerConfig, pluginSetting);
final KafkaTopicProducerMetrics topicMetrics = new KafkaTopicProducerMetrics(topic.getName(), pluginMetrics, topicNameInMetrics);
return new KafkaCustomProducer(producer,
kafkaProducerConfig, dlqSink,
expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null);
expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics);
}
private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig) {
checkTopicCreationCriteriaAndCreateTopic(kafkaProducerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class KafkaSink extends AbstractSink<Record<Event>> {

private final PluginSetting pluginSetting;

private final PluginMetrics pluginMetrics;

private final ExpressionEvaluator expressionEvaluator;

private final Lock reentrantLock;
Expand All @@ -69,10 +72,11 @@ public class KafkaSink extends AbstractSink<Record<Event>> {

@DataPrepperPluginConstructor
public KafkaSink(final PluginSetting pluginSetting, final KafkaSinkConfig kafkaSinkConfig, final PluginFactory pluginFactory,
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext,
final PluginMetrics pluginMetrics, final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext,
AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting);
this.pluginSetting = pluginSetting;
this.pluginMetrics = pluginMetrics;
this.kafkaSinkConfig = kafkaSinkConfig;
this.pluginFactory = pluginFactory;
this.expressionEvaluator = expressionEvaluator;
Expand Down Expand Up @@ -156,7 +160,7 @@ private void checkTopicCreationCriteriaAndCreateTopic() {

public KafkaCustomProducer createProducer() {
// TODO: Add the DLQSink here. new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting)
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, pluginFactory, pluginSetting, expressionEvaluator, sinkContext);
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, pluginFactory, pluginSetting, expressionEvaluator, sinkContext, pluginMetrics, true);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier;
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void start(Buffer<Record<Event>> buffer) {
KafkaSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
KafkaTopicMetrics topicMetrics = new KafkaTopicMetrics(topic.getName(), pluginMetrics);
KafkaTopicConsumerMetrics topicMetrics = new KafkaTopicConsumerMetrics(topic.getName(), pluginMetrics, true);
Properties consumerProperties = getConsumerProperties(topic, authProperties);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
Expand Down
Loading

0 comments on commit 0f1eb3f

Please sign in to comment.