Skip to content

Commit

Permalink
Add new metrics for tracking bytes received and processed by KDS (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#5237)

* Add new metrics for tracking bytes received and processed by KDS

Signed-off-by: Souvik Bose <[email protected]>

* Address review comments.

Signed-off-by: Souvik Bose <[email protected]>

* Modify the code as per comments

Signed-off-by: Souvik Bose <[email protected]>

---------

Signed-off-by: Souvik Bose <[email protected]>
Co-authored-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 and sbose2k21 authored Jan 6, 2025
1 parent 708843c commit 795401f
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package org.opensearch.dataprepper.plugins.kinesis.source;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig;
Expand Down Expand Up @@ -70,9 +72,11 @@
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSED;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSING_ERRORS;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_BYTES_PROCESSED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_BYTES_RECEIVED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_CHECKPOINT_FAILURES;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSING_ERRORS_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_STREAM_TAG_KEY;

public class KinesisSourceIT {
Expand All @@ -83,6 +87,8 @@ public class KinesisSourceIT {
private static final String codec_plugin_name = "ndjson";
private static final String LEASE_TABLE_PREFIX = "kinesis-lease-table";
private static final int NUMBER_OF_RECORDS_TO_ACCUMULATE = 10;
private static final int MAX_INITIALIZATION_ATTEMPTS = 3;
private static final Duration BUFFER_TIMEOUT = Duration.ofMillis(1);

@Mock
private AcknowledgementSetManager acknowledgementSetManager;
Expand Down Expand Up @@ -141,6 +147,12 @@ public class KinesisSourceIT {
@Mock
private Counter checkpointFailures;

@Mock
private DistributionSummary bytesReceivedSummary;

@Mock
private DistributionSummary bytesProcessedSummary;

private KinesisClient kinesisClient;

private DynamoDbClient dynamoDbClient;
Expand All @@ -165,6 +177,7 @@ void setup() {
when(kinesisStreamConfig.getName()).thenReturn(streamName);
when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(CHECKPOINT_INTERVAL);
when(kinesisStreamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.TRIM_HORIZON);
when(kinesisStreamConfig.getCompression()).thenReturn(CompressionOption.NONE);
when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.ENHANCED_FAN_OUT);
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(kinesisStreamConfig));
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig);
Expand All @@ -179,7 +192,8 @@ void setup() {
when(kinesisSourceConfig.getCodec()).thenReturn(pluginModel);
when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false);
when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE);
when(kinesisSourceConfig.getBufferTimeout()).thenReturn(Duration.ofMillis(1));
when(kinesisSourceConfig.getBufferTimeout()).thenReturn(BUFFER_TIMEOUT);
when(kinesisSourceConfig.getMaxInitializationAttempts()).thenReturn(MAX_INITIALIZATION_ATTEMPTS);

kinesisClientFactory = mock(KinesisClientFactory.class);
when(kinesisClientFactory.buildDynamoDBClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(DynamoDbAsyncClient.builder()
Expand All @@ -204,17 +218,20 @@ void setup() {
when(pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamName))
.thenReturn(acknowledgementSetFailures);

when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED, KINESIS_STREAM_TAG_KEY, streamName))
when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamName))
.thenReturn(recordsProcessed);

when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamName))
when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamName))
.thenReturn(recordProcessingErrors);

when(pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamName))
.thenReturn(checkpointFailures);

kinesisClient = KinesisClient.builder().region(Region.of(System.getProperty(AWS_REGION))).build();
dynamoDbClient = DynamoDbClient.builder().region(Region.of(System.getProperty(AWS_REGION))).build();
when(pluginMetrics.summary(KINESIS_RECORD_BYTES_RECEIVED_METRIC_NAME)).thenReturn(bytesReceivedSummary);

when(pluginMetrics.summary(KINESIS_RECORD_BYTES_PROCESSED_METRIC_NAME)).thenReturn(bytesProcessedSummary);
kinesisIngester = new KinesisIngester(kinesisClient, streamName, dynamoDbClient, leaseTableName);

kinesisIngester.createStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
Expand All @@ -32,13 +33,12 @@ public KinesisRecordConverter(final InputCodec codec) {
this.codec = codec;
}

public List<Record<Event>> convert(final DecompressionEngine decompressionEngine,
List<KinesisClientRecord> kinesisClientRecords,
final String streamName) throws IOException {
List<Record<Event>> records = new ArrayList<>();
public List<KinesisInputOutputRecord> convert(final DecompressionEngine decompressionEngine,
List<KinesisClientRecord> kinesisClientRecords,
final String streamName) throws IOException {
List<KinesisInputOutputRecord> records = new ArrayList<>();
for (KinesisClientRecord kinesisClientRecord : kinesisClientRecords) {
processRecord(decompressionEngine, kinesisClientRecord, record -> {
records.add(record);
Event event = record.getData();
EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE,
Expand All @@ -49,6 +49,9 @@ public List<Record<Event>> convert(final DecompressionEngine decompressionEngine
final Instant externalOriginationTime = kinesisClientRecord.approximateArrivalTimestamp();
event.getEventHandle().setExternalOriginationTime(externalOriginationTime);
event.getMetadata().setExternalOriginationTime(externalOriginationTime);
records.add(KinesisInputOutputRecord.builder()
.withIncomingRecordSizeBytes(kinesisClientRecord.data().position())
.withDataPrepperRecord(record).build());
});
}
return records;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

@Builder(setterPrefix = "with")
@Getter
@AllArgsConstructor
public class KinesisInputOutputRecord {
private Record<Event> dataPrepperRecord;
private long incomingRecordSizeBytes;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand Down Expand Up @@ -70,10 +71,14 @@ public class KinesisRecordProcessor implements ShardRecordProcessor {
private final Counter recordsProcessed;
private final Counter recordProcessingErrors;
private final Counter checkpointFailures;
private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";
public static final String KINESIS_RECORD_PROCESSED = "recordProcessed";
public static final String KINESIS_RECORD_PROCESSING_ERRORS = "recordProcessingErrors";
public static final String KINESIS_RECORD_PROCESSED_METRIC_NAME = "recordProcessed";
public static final String KINESIS_RECORD_PROCESSING_ERRORS_METRIC_NAME = "recordProcessingErrors";
public static final String KINESIS_RECORD_BYTES_RECEIVED_METRIC_NAME = "bytesReceived";
public static final String KINESIS_RECORD_BYTES_PROCESSED_METRIC_NAME = "bytesProcessed";
public static final String KINESIS_CHECKPOINT_FAILURES = "checkpointFailures";
public static final String KINESIS_STREAM_TAG_KEY = "stream";
private AtomicBoolean isStopRequested;
Expand All @@ -93,9 +98,11 @@ public KinesisRecordProcessor(final BufferAccumulator<Record<Event>> bufferAccum
this.acknowledgementSetManager = acknowledgementSetManager;
this.acknowledgementSetSuccesses = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.acknowledgementSetFailures = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.recordsProcessed = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.recordProcessingErrors = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.recordsProcessed = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.recordProcessingErrors = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.checkpointFailures = pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.bytesReceivedSummary = pluginMetrics.summary(KINESIS_RECORD_BYTES_RECEIVED_METRIC_NAME);
this.bytesProcessedSummary = pluginMetrics.summary(KINESIS_RECORD_BYTES_PROCESSED_METRIC_NAME);
this.checkpointInterval = kinesisStreamConfig.getCheckPointInterval();
this.bufferAccumulator = bufferAccumulator;
this.kinesisCheckpointerTracker = kinesisCheckpointerTracker;
Expand Down Expand Up @@ -161,16 +168,21 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {

// Track the records for checkpoint purpose
kinesisCheckpointerTracker.addRecordForCheckpoint(extendedSequenceNumber, processRecordsInput.checkpointer());
List<Record<Event>> records = kinesisRecordConverter.convert(

List<KinesisInputOutputRecord> kinesisOutputRecords = kinesisRecordConverter.convert(
kinesisStreamConfig.getCompression().getDecompressionEngine(),
processRecordsInput.records(), streamIdentifier.streamName());

int eventCount = 0;
for (Record<Event> record: records) {
Event event = record.getData();
for (KinesisInputOutputRecord kinesisInputOutputRecord: kinesisOutputRecords) {
Record<Event> dataPrepperRecord = kinesisInputOutputRecord.getDataPrepperRecord();
long incomingRecordSizeBytes = kinesisInputOutputRecord.getIncomingRecordSizeBytes();
bytesReceivedSummary.record(incomingRecordSizeBytes);
Event event = dataPrepperRecord.getData();
acknowledgementSetOpt.ifPresent(acknowledgementSet -> acknowledgementSet.add(event));

bufferAccumulator.add(record);
bufferAccumulator.add(dataPrepperRecord);
bytesProcessedSummary.record(incomingRecordSizeBytes);
eventCount++;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.source.converter.MetadataKeyAttributes;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.nio.ByteBuffer;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

public class KinesisInputOutputRecordTest {

@Test
void builder_defaultCreatesObjectCorrectly() {

KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder().build();

assertEquals(0L, kinesisInputOutputRecord.getIncomingRecordSizeBytes());
assertNull(kinesisInputOutputRecord.getDataPrepperRecord());
}

@Test
void builder_createsObjectCorrectly() {
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, UUID.randomUUID().toString());
Record<Event> record = new Record<>(event);
KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder()
.data(ByteBuffer.wrap(event.toJsonString().getBytes()))
.sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build();

KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder()
.withIncomingRecordSizeBytes(100L)
.withDataPrepperRecord(record)
.build();

assertNotNull(kinesisInputOutputRecord.getDataPrepperRecord());
assertEquals(kinesisInputOutputRecord.getDataPrepperRecord(), record);
assertEquals(100L, kinesisInputOutputRecord.getIncomingRecordSizeBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -99,13 +98,13 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException {
InputStream inputStream = new ByteArrayInputStream(writer.toString().getBytes());
when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(inputStream);

List<Record<Event>> events = kinesisRecordConverter.convert(decompressionEngine, List.of(kinesisClientRecord), streamId);
List<KinesisInputOutputRecord> kinesisOutputRecords = kinesisRecordConverter.convert(decompressionEngine, List.of(kinesisClientRecord), streamId);

assertEquals(events.size(), numRecords);
events.forEach(eventRecord -> {
assertEquals(eventRecord.getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE), partitionKey);
assertEquals(eventRecord.getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), sequenceNumber);
assertEquals(eventRecord.getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SUB_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), subsequenceNumber);
assertEquals(kinesisOutputRecords.size(), numRecords);
kinesisOutputRecords.forEach(KinesisInputOutputRecord -> {
assertEquals(KinesisInputOutputRecord.getDataPrepperRecord().getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE), partitionKey);
assertEquals(KinesisInputOutputRecord.getDataPrepperRecord().getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), sequenceNumber);
assertEquals(KinesisInputOutputRecord.getDataPrepperRecord().getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SUB_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), subsequenceNumber);
});
}

Expand Down
Loading

0 comments on commit 795401f

Please sign in to comment.