Skip to content

Commit

Permalink
Add 4xx aggregate metric and shard progress metric for dynamodb source (
Browse files Browse the repository at this point in the history
#3913) (#3921)

Signed-off-by: Taylor Gray <[email protected]>
(cherry picked from commit e6df3eb)

Co-authored-by: Taylor Gray <[email protected]>
  • Loading branch information
1 parent d527140 commit 77c3964
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public String submitExportJob(String tableArn, String bucket, String prefix, Str
return null;
} catch (SdkException e) {
LOG.error("Failed to submit an export job with error " + e.getMessage());
dynamoAggregateMetrics.getExport4xxErrors().increment();
return null;
}

Expand All @@ -82,6 +83,7 @@ public String getExportManifest(String exportArn) {
LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage());
} catch (SdkException e) {
LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage());
dynamoAggregateMetrics.getExport4xxErrors().increment();
}
return manifestKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
Expand Down Expand Up @@ -176,6 +177,10 @@ private List<Shard> listShards(String streamArn, String lastEvaluatedShardId) {
LOG.error("Received an internal server exception from DynamoDB while listing shards: {}", e.getMessage());
dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
return shards;
} catch (final SdkException e) {
LOG.error("Received an exception from DynamoDB while listing shards: {}", e.getMessage());
dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment();
return shards;
}

long endTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.stream;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class ShardConsumer implements Runnable {

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;
static final String SHARD_PROGRESS = "shardProgress";


private final DynamoDbStreamsClient dynamoDbStreamsClient;
Expand All @@ -105,9 +107,12 @@ public class ShardConsumer implements Runnable {

private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;

private final Counter shardProgress;

private long recordsWrittenToBuffer;

private ShardConsumer(Builder builder) {
this.shardProgress = builder.pluginMetrics.counter(SHARD_PROGRESS);
this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient;
this.checkpointer = builder.checkpointer;
this.shardIterator = builder.shardIterator;
Expand Down Expand Up @@ -226,6 +231,7 @@ public void run() {
LOG.debug("Shard Consumer start to run...");
// Check should skip processing or not.
if (shouldSkip()) {
shardProgress.increment();
if (acknowledgementSet != null) {
checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout);
acknowledgementSet.complete();
Expand Down Expand Up @@ -272,12 +278,14 @@ public void run() {
.filter(record -> record.dynamodb().approximateCreationDateTime().isAfter(startTime))
.collect(Collectors.toList());
recordConverter.writeToBuffer(acknowledgementSet, records);
shardProgress.increment();
recordsWrittenToBuffer += records.size();
long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli();
interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS;

} else {
interval = GET_RECORD_INTERVAL_MILLS;
shardProgress.increment();
}

try {
Expand Down Expand Up @@ -324,6 +332,7 @@ private GetRecordsResponse callGetRecords(String shardIterator) {
dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
throw new RuntimeException(ex.getMessage());
} catch (final Exception e) {
dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment();
throw new RuntimeException(e.getMessage());
}

Expand All @@ -335,6 +344,7 @@ private void waitForExport() {
while (!checkpointer.isExportDone()) {
LOG.debug("Export is in progress, wait...");
try {
shardProgress.increment();
Thread.sleep(DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS);
// The wait for export may take a long time
// Need to extend the timeout of the ownership in the coordination store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public String getShardIterator(String streamArn, String shardId, String sequence
LOG.error("Received an internal server error from DynamoDB while getting a shard iterator: {}", e.getMessage());
return null;
} catch (SdkException e) {
dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment();
LOG.error("Exception when trying to get the shard iterator due to {}", e.getMessage());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,46 @@ public class DynamoDBSourceAggregateMetrics {
private static final String DYNAMO_DB = "dynamodb";

private static final String DDB_STREAM_5XX_EXCEPTIONS = "stream5xxErrors";
private static final String DDB_STREAM_4XX_EXCEPTIONS = "stream4xxErrors";
private static final String DDB_STREAM_API_INVOCATIONS = "streamApiInvocations";
private static final String DDB_EXPORT_5XX_ERRORS = "export5xxErrors";
private static final String DDB_EXPORT_4XX_ERRORS = "export4xxErrors";
private static final String DDB_EXPORT_API_INVOCATIONS = "exportApiInvocations";



private final PluginMetrics pluginMetrics;

private final Counter stream5xxErrors;
private final Counter stream4xxErrors;
private final Counter streamApiInvocations;
private final Counter export5xxErrors;
private final Counter export4xxErrors;
private final Counter exportApiInvocations;

public DynamoDBSourceAggregateMetrics() {
this.pluginMetrics = PluginMetrics.fromPrefix(DYNAMO_DB);
this.stream5xxErrors = pluginMetrics.counter(DDB_STREAM_5XX_EXCEPTIONS);
this.stream4xxErrors = pluginMetrics.counter(DDB_STREAM_4XX_EXCEPTIONS);
this.streamApiInvocations = pluginMetrics.counter(DDB_STREAM_API_INVOCATIONS);
this.export5xxErrors = pluginMetrics.counter(DDB_EXPORT_5XX_ERRORS);
this.export4xxErrors = pluginMetrics.counter(DDB_EXPORT_4XX_ERRORS);
this.exportApiInvocations = pluginMetrics.counter(DDB_EXPORT_API_INVOCATIONS);
}

public Counter getStream5xxErrors() {
return stream5xxErrors;
}

public Counter getStream4xxErrors() { return stream4xxErrors; }

public Counter getStreamApiInvocations() { return streamApiInvocations; }

public Counter getExport5xxErrors() {
return export5xxErrors;
}

public Counter getExport4xxErrors() { return export4xxErrors; }

public Counter getExportApiInvocations() { return exportApiInvocations; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange;
import software.amazon.awssdk.services.dynamodb.model.Shard;
Expand Down Expand Up @@ -45,6 +46,9 @@ class ShardManagerTest {
@Mock
private Counter stream5xxErrors;

@Mock
private Counter stream4xxErrors;

@Mock
private Counter streamApiInvocations;

Expand Down Expand Up @@ -132,4 +136,17 @@ void stream5xxError_is_incremented_when_describe_stream_throws_internal_error()
verify(streamApiInvocations).increment();
}

@Test
void stream4xxErrors_is_incremented_when_describe_stream_throws_DynamoDBException() {
when(dynamoDbStreamsClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(DynamoDbException.class);
when(dynamoDBSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors);

final List<Shard> shards = shardManager.runDiscovery(streamArn);
assertThat(shards, notNullValue());
assertThat(shards.isEmpty(), equalTo(true));

verify(stream4xxErrors).increment();
verify(streamApiInvocations).increment();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
Expand Down Expand Up @@ -155,4 +156,22 @@ void stream5xxErrors_is_incremented_when_get_shard_iterator_throws_internal_exce
verify(streamApiInvocations).increment();
}

@Test
void stream4xxErrors_is_incremented_when_get_shard_iterator_throws_dynamodb_exception() {
StreamProgressState state = new StreamProgressState();
state.setWaitForExport(false);
state.setStartTime(Instant.now().toEpochMilli());
streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state));

when(dynamoDbStreamsClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(DynamoDbException.class);
final Counter stream4xxErrors = mock(Counter.class);
when(dynamoDBSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null);
assertThat(consumer, nullValue());
verify(stream4xxErrors).increment();
verify(streamApiInvocations).increment();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
Expand Down Expand Up @@ -57,6 +58,7 @@
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.BUFFER_TIMEOUT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.DEFAULT_BUFFER_BATCH_SIZE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.SHARD_PROGRESS;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamCheckpointer.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE;

@ExtendWith(MockitoExtension.class)
Expand All @@ -77,9 +79,15 @@ class ShardConsumerTest {
@Mock
private Counter stream5xxErrors;

@Mock
private Counter stream4xxErrors;

@Mock
private Counter streamApiInvocations;

@Mock
private Counter shardProgress;

@Mock
private Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer;

Expand Down Expand Up @@ -157,7 +165,9 @@ void setup() throws Exception {
.build();
lenient().when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenReturn(response);

given(pluginMetrics.counter(anyString())).willReturn(testCounter);
given(pluginMetrics.counter(SHARD_PROGRESS)).willReturn(shardProgress);
given(pluginMetrics.counter("changeEventsProcessed")).willReturn(testCounter);
given(pluginMetrics.counter("changeEventsProcessingErrors")).willReturn(testCounter);
given(pluginMetrics.summary(anyString())).willReturn(testSummary);

when(aggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations);
Expand Down Expand Up @@ -192,6 +202,7 @@ void test_run_shardConsumer_correctly() throws Exception {
verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE));

verify(streamApiInvocations).increment();
verify(shardProgress).increment();
}

@Test
Expand Down Expand Up @@ -230,6 +241,7 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception {
verify(acknowledgementSet).complete();

verify(streamApiInvocations).increment();
verify(shardProgress).increment();
}

@Test
Expand All @@ -256,6 +268,30 @@ void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() {
verify(streamApiInvocations).increment();
}

@Test
void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() {
ShardConsumer shardConsumer;
when(aggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors);
try (
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer)
.shardIterator(shardIterator)
.checkpointer(checkpointer)
.tableInfo(tableInfo)
.startTime(null)
.waitForExport(false)
.build();
}

when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(DynamoDbException.class);

assertThrows(RuntimeException.class, shardConsumer::run);

verify(stream4xxErrors).increment();
verify(streamApiInvocations).increment();
}

/**
* Helper function to generate some data.
*/
Expand Down

0 comments on commit 77c3964

Please sign in to comment.