diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/PluginMetrics.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/PluginMetrics.java index b20eec607d..e04eef230f 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/PluginMetrics.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/metrics/PluginMetrics.java @@ -43,6 +43,16 @@ public static PluginMetrics fromNames(final String componentId, final String com .add(componentId).toString()); } + /** + * Provides reference to APIs that register timer, counter, gauge into global registry. + * + * @param metricsPrefix the prefix to provide to metrics + * @return The {@link PluginMetrics} + */ + public static PluginMetrics fromPrefix(final String metricsPrefix) { + return new PluginMetrics(metricsPrefix); + } + private PluginMetrics(final String metricsPrefix) { this.metricsPrefix = metricsPrefix; } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/metrics/PluginMetricsTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/metrics/PluginMetricsTest.java index db6b88d7a0..873be3bc71 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/metrics/PluginMetricsTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/metrics/PluginMetricsTest.java @@ -15,6 +15,7 @@ import java.util.Collections; import java.util.StringJoiner; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyList; @@ -41,6 +42,18 @@ void setUp() { objectUnderTest = PluginMetrics.fromPluginSetting(pluginSetting); } + @Test + public void testCounterWithMetricsPrefix() { + + final String prefix = UUID.randomUUID().toString(); + + objectUnderTest = PluginMetrics.fromPrefix(prefix); + final Counter counter = objectUnderTest.counter("counter"); + assertEquals( + prefix + MetricNames.DELIMITER + "counter", + counter.getId().getName()); + } + @Test public void testCounter() { final Counter counter = objectUnderTest.counter("counter"); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 99ea8fa310..96376590bf 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumerFactory; import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -54,6 +55,8 @@ public class DynamoDBService { private final PluginMetrics pluginMetrics; + private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; + private final AcknowledgementSetManager acknowledgementSetManager; @@ -66,6 +69,7 @@ public DynamoDBService(final EnhancedSourceCoordinator coordinator, this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.dynamoDBSourceConfig = sourceConfig; + this.dynamoDBSourceAggregateMetrics = new DynamoDBSourceAggregateMetrics(); // Initialize AWS clients dynamoDbClient = clientFactory.buildDynamoDBClient(); @@ -73,7 +77,7 @@ public DynamoDBService(final EnhancedSourceCoordinator coordinator, s3Client = clientFactory.buildS3Client(); // A shard manager is responsible to retrieve the shard information from streams. - shardManager = new ShardManager(dynamoDbStreamsClient); + shardManager = new ShardManager(dynamoDbStreamsClient, dynamoDBSourceAggregateMetrics); tableConfigs = sourceConfig.getTableConfigs(); executor = Executors.newFixedThreadPool(4); } @@ -89,12 +93,12 @@ public void start(Buffer<Record<Event>> buffer) { LOG.info("Start running DynamoDB service"); ManifestFileReader manifestFileReader = new ManifestFileReader(new S3ObjectReader(s3Client)); - Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics); + Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics, dynamoDBSourceAggregateMetrics); DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer); Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); - ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer); + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null)); // leader scheduler will handle the initialization Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index 23b0888377..73e279523c 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -112,6 +112,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) { } else { runLoader.whenComplete((v, ex) -> { if (ex != null) { + LOG.error("There was an exception while processing an S3 data file: {}", ex); coordinator.giveUpPartition(dataFilePartition); } numOfWorkers.decrementAndGet(); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java index a8a75e1447..d8d6d9c2e1 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary; import org.opensearch.dataprepper.plugins.source.dynamodb.model.LoadStatus; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -74,11 +75,15 @@ public class ExportScheduler implements Runnable { private final Counter exportS3ObjectsTotalCounter; private final Counter exportRecordsTotalCounter; - public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, DynamoDbClient dynamoDBClient, ManifestFileReader manifestFileReader, PluginMetrics pluginMetrics) { + public ExportScheduler(final EnhancedSourceCoordinator enhancedSourceCoordinator, + final DynamoDbClient dynamoDBClient, + final ManifestFileReader manifestFileReader, + final PluginMetrics pluginMetrics, + final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics) { this.enhancedSourceCoordinator = enhancedSourceCoordinator; this.dynamoDBClient = dynamoDBClient; this.pluginMetrics = pluginMetrics; - this.exportTaskManager = new ExportTaskManager(dynamoDBClient); + this.exportTaskManager = new ExportTaskManager(dynamoDBClient, dynamoDBSourceAggregateMetrics); this.manifestFileReader = manifestFileReader; executor = Executors.newCachedThreadPool(); @@ -213,7 +218,7 @@ private void createDataFilePartitions(final String exportArn, private void closeExportPartitionWithError(ExportPartition exportPartition) { LOG.error("The export from DynamoDb to S3 failed, it will be retried"); - exportJobFailureCounter.increment(1); + exportJobFailureCounter.increment(); ExportProgressState exportProgressState = exportPartition.getProgressState().get(); // Clear current Arn, so that a new export can be submitted. exportProgressState.setExportArn(null); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportTaskManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportTaskManager.java index 95d6bce986..2ecd2119c9 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportTaskManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportTaskManager.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.export; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; @@ -14,6 +15,7 @@ import software.amazon.awssdk.services.dynamodb.model.ExportFormat; import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeRequest; import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeResponse; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import software.amazon.awssdk.services.dynamodb.model.S3SseAlgorithm; import java.time.Instant; @@ -25,10 +27,12 @@ public class ExportTaskManager { private static final ExportFormat DEFAULT_EXPORT_FORMAT = ExportFormat.ION; private final DynamoDbClient dynamoDBClient; + private final DynamoDBSourceAggregateMetrics dynamoAggregateMetrics; - - public ExportTaskManager(DynamoDbClient dynamoDBClient) { + public ExportTaskManager(final DynamoDbClient dynamoDBClient, + final DynamoDBSourceAggregateMetrics dynamoAggregateMetrics) { this.dynamoDBClient = dynamoDBClient; + this.dynamoAggregateMetrics = dynamoAggregateMetrics; } public String submitExportJob(String tableArn, String bucket, String prefix, String kmsKeyId, Instant exportTime) { @@ -46,12 +50,17 @@ public String submitExportJob(String tableArn, String bucket, String prefix, Str try { + dynamoAggregateMetrics.getExportApiInvocations().increment(); ExportTableToPointInTimeResponse response = dynamoDBClient.exportTableToPointInTime(req); String exportArn = response.exportDescription().exportArn(); String status = response.exportDescription().exportStatusAsString(); LOG.debug("Export Job submitted with ARN {} and status {}", exportArn, status); return exportArn; + } catch (final InternalServerErrorException e) { + dynamoAggregateMetrics.getExport5xxErrors().increment(); + LOG.error("Failed to submit an export job with error: {}", e.getMessage()); + return null; } catch (SdkException e) { LOG.error("Failed to submit an export job with error " + e.getMessage()); return null; @@ -64,11 +73,15 @@ public String getExportManifest(String exportArn) { String manifestKey = null; try { + dynamoAggregateMetrics.getExportApiInvocations().increment(); DescribeExportResponse resp = dynamoDBClient.describeExport(request); manifestKey = resp.exportDescription().exportManifest(); + } catch (final InternalServerErrorException e) { + dynamoAggregateMetrics.getExport5xxErrors().increment(); + LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage()); } catch (SdkException e) { - LOG.error("Unable to get manifest file for export " + exportArn); + LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage()); } return manifestKey; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManager.java index 36fcec8f81..0b29a7b1a0 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManager.java @@ -1,9 +1,11 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.leader; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import software.amazon.awssdk.services.dynamodb.model.Shard; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; @@ -43,10 +45,13 @@ public class ShardManager { private final DynamoDbStreamsClient streamsClient; + private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; - public ShardManager(final DynamoDbStreamsClient streamsClient) { + public ShardManager(final DynamoDbStreamsClient streamsClient, + final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics) { this.streamsClient = streamsClient; + this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics; streamMap = new HashMap<>(); endingSequenceNumberMap = new HashMap<>(); } @@ -148,22 +153,30 @@ private List<Shard> listShards(String streamArn, String lastEvaluatedShardId) { long startTime = System.currentTimeMillis(); // Get all the shard IDs from the stream. List<Shard> shards = new ArrayList<>(); - do { - DescribeStreamRequest req = DescribeStreamRequest.builder() - .streamArn(streamArn) - .limit(MAX_SHARD_COUNT) - .exclusiveStartShardId(lastEvaluatedShardId) - .build(); - DescribeStreamResponse describeStreamResult = streamsClient.describeStream(req); - shards.addAll(describeStreamResult.streamDescription().shards()); + try { + do { + DescribeStreamRequest req = DescribeStreamRequest.builder() + .streamArn(streamArn) + .limit(MAX_SHARD_COUNT) + .exclusiveStartShardId(lastEvaluatedShardId) + .build(); - // If LastEvaluatedShardId is set, - // at least one more page of shard IDs to retrieve - lastEvaluatedShardId = describeStreamResult.streamDescription().lastEvaluatedShardId(); + dynamoDBSourceAggregateMetrics.getStreamApiInvocations().increment(); + DescribeStreamResponse describeStreamResult = streamsClient.describeStream(req); + shards.addAll(describeStreamResult.streamDescription().shards()); + // If LastEvaluatedShardId is set, + // at least one more page of shard IDs to retrieve + lastEvaluatedShardId = describeStreamResult.streamDescription().lastEvaluatedShardId(); - } while (lastEvaluatedShardId != null); + + } while (lastEvaluatedShardId != null); + } catch(final InternalServerErrorException e) { + LOG.error("Received an internal server exception from DynamoDB while listing shards: {}", e.getMessage()); + dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment(); + return shards; + } long endTime = System.currentTimeMillis(); LOG.info("Listing shards (DescribeStream call) took {} milliseconds with {} shards found", endTime - startTime, shards.size()); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 554496b99d..0dda2a4f02 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -13,10 +13,12 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import java.time.Duration; @@ -101,6 +103,8 @@ public class ShardConsumer implements Runnable { private final String shardId; + private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; + private long recordsWrittenToBuffer; private ShardConsumer(Builder builder) { @@ -117,10 +121,14 @@ private ShardConsumer(Builder builder) { this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout; this.shardId = builder.shardId; this.recordsWrittenToBuffer = 0; + this.dynamoDBSourceAggregateMetrics = builder.dynamoDBSourceAggregateMetrics; } - public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) { - return new Builder(dynamoDbStreamsClient, pluginMetrics, buffer); + public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient, + final PluginMetrics pluginMetrics, + final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics, + final Buffer<Record<Event>> buffer) { + return new Builder(dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); } @@ -130,6 +138,8 @@ static class Builder { private final PluginMetrics pluginMetrics; + private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; + private final Buffer<Record<Event>> buffer; private TableInfo tableInfo; @@ -149,9 +159,13 @@ static class Builder { private AcknowledgementSet acknowledgementSet; private Duration dataFileAcknowledgmentTimeout; - public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) { + public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient, + final PluginMetrics pluginMetrics, + final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics, + final Buffer<Record<Event>> buffer) { this.dynamoDbStreamsClient = dynamoDbStreamsClient; this.pluginMetrics = pluginMetrics; + this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics; this.buffer = buffer; } @@ -303,9 +317,13 @@ private GetRecordsResponse callGetRecords(String shardIterator) { .build(); try { + dynamoDBSourceAggregateMetrics.getStreamApiInvocations().increment(); GetRecordsResponse response = dynamoDbStreamsClient.getRecords(req); return response; - } catch (Exception e) { + } catch(final InternalServerErrorException ex) { + dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment(); + throw new RuntimeException(ex.getMessage()); + } catch (final Exception e) { throw new RuntimeException(e.getMessage()); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index aea98740c5..c26e5143a0 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -16,12 +16,14 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.TableUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; @@ -40,16 +42,19 @@ public class ShardConsumerFactory { private final EnhancedSourceCoordinator enhancedSourceCoordinator; private final PluginMetrics pluginMetrics; + private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; private final Buffer<Record<Event>> buffer; public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordinator, final DynamoDbStreamsClient streamsClient, final PluginMetrics pluginMetrics, + final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics, final Buffer<Record<Event>> buffer) { this.streamsClient = streamsClient; this.enhancedSourceCoordinator = enhancedSourceCoordinator; this.pluginMetrics = pluginMetrics; + this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics; this.buffer = buffer; } @@ -92,7 +97,7 @@ public Runnable createConsumer(final StreamPartition streamPartition, LOG.debug("Create shard consumer for {} with shardIter {}", streamPartition.getShardId(), shardIterator); LOG.debug("Create shard consumer for {} with lastShardIter {}", streamPartition.getShardId(), lastShardIterator); - ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, buffer) + ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer) .tableInfo(tableInfo) .checkpointer(checkpointer) .shardIterator(shardIterator) @@ -149,9 +154,13 @@ public String getShardIterator(String streamArn, String shardId, String sequence } try { + dynamoDBSourceAggregateMetrics.getStreamApiInvocations().increment(); GetShardIteratorResponse getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); - String currentShardIter = getShardIteratorResult.shardIterator(); - return currentShardIter; + return getShardIteratorResult.shardIterator(); + } catch (final InternalServerErrorException e) { + dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment(); + LOG.error("Received an internal server error from DynamoDB while getting a shard iterator: {}", e.getMessage()); + return null; } catch (SdkException e) { LOG.error("Exception when trying to get the shard iterator due to {}", e.getMessage()); return null; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/DynamoDBSourceAggregateMetrics.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/DynamoDBSourceAggregateMetrics.java new file mode 100644 index 0000000000..b63b1950bf --- /dev/null +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/DynamoDBSourceAggregateMetrics.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.dynamodb.utils; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; + +public class DynamoDBSourceAggregateMetrics { + + private static final String DYNAMO_DB = "dynamodb"; + + private static final String DDB_STREAM_5XX_EXCEPTIONS = "stream5xxErrors"; + private static final String DDB_STREAM_API_INVOCATIONS = "streamApiInvocations"; + private static final String DDB_EXPORT_5XX_ERRORS = "export5xxErrors"; + private static final String DDB_EXPORT_API_INVOCATIONS = "exportApiInvocations"; + + + private final PluginMetrics pluginMetrics; + + private final Counter stream5xxErrors; + private final Counter streamApiInvocations; + private final Counter export5xxErrors; + private final Counter exportApiInvocations; + + public DynamoDBSourceAggregateMetrics() { + this.pluginMetrics = PluginMetrics.fromPrefix(DYNAMO_DB); + this.stream5xxErrors = pluginMetrics.counter(DDB_STREAM_5XX_EXCEPTIONS); + this.streamApiInvocations = pluginMetrics.counter(DDB_STREAM_API_INVOCATIONS); + this.export5xxErrors = pluginMetrics.counter(DDB_EXPORT_5XX_ERRORS); + this.exportApiInvocations = pluginMetrics.counter(DDB_EXPORT_API_INVOCATIONS); + } + + public Counter getStream5xxErrors() { + return stream5xxErrors; + } + + public Counter getStreamApiInvocations() { return streamApiInvocations; } + + public Counter getExport5xxErrors() { + return export5xxErrors; + } + + public Counter getExportApiInvocations() { return exportApiInvocations; } +} diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java index b033feac5f..e64c65a075 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.DescribeExportRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeExportResponse; @@ -26,6 +27,7 @@ import software.amazon.awssdk.services.dynamodb.model.ExportStatus; import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeRequest; import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeResponse; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import java.time.Instant; import java.util.Map; @@ -69,6 +71,12 @@ class ExportSchedulerTest { @Mock private PluginMetrics pluginMetrics; + @Mock + private DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; + + @Mock + private Counter exportApiInvocations; + private ExportScheduler scheduler; @Mock @@ -119,6 +127,7 @@ void setup() { @Test public void test_run_exportJob_correctly() throws InterruptedException { + when(dynamoDBSourceAggregateMetrics.getExportApiInvocations()).thenReturn(exportApiInvocations); when(exportPartition.getTableArn()).thenReturn(tableArn); when(exportPartition.getExportTime()).thenReturn(exportTime); @@ -151,7 +160,7 @@ public void test_run_exportJob_correctly() throws InterruptedException { DescribeExportResponse describeExportResponse = DescribeExportResponse.builder().exportDescription(desc).build(); when(dynamoDBClient.describeExport(any(DescribeExportRequest.class))).thenReturn(describeExportResponse); - scheduler = new ExportScheduler(coordinator, dynamoDBClient, manifestFileReader, pluginMetrics); + scheduler = new ExportScheduler(coordinator, dynamoDBClient, manifestFileReader, pluginMetrics, dynamoDBSourceAggregateMetrics); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(scheduler); @@ -168,6 +177,7 @@ public void test_run_exportJob_correctly() throws InterruptedException { verify(exportJobSuccess).increment(); verify(exportFilesTotal).increment(2); verify(exportRecordsTotal).increment(300); + verify(exportApiInvocations, times(2)).increment(); verifyNoInteractions(exportJobErrors); executor.shutdownNow(); @@ -178,7 +188,7 @@ public void test_run_exportJob_correctly() throws InterruptedException { void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException { given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willThrow(RuntimeException.class); - scheduler = new ExportScheduler(coordinator, dynamoDBClient, manifestFileReader, pluginMetrics); + scheduler = new ExportScheduler(coordinator, dynamoDBClient, manifestFileReader, pluginMetrics, dynamoDBSourceAggregateMetrics); ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future<?> future = executorService.submit(() -> scheduler.run()); @@ -189,4 +199,41 @@ void run_catches_exception_and_retries_when_exception_is_thrown_during_processin assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); } + @Test + void export5xxErrors_is_incremented_when_export_apis_throw_internal_errors() throws InterruptedException { + final Counter export5xxErrors = mock(Counter.class); + when(dynamoDBSourceAggregateMetrics.getExport5xxErrors()).thenReturn(export5xxErrors); + + when(dynamoDBSourceAggregateMetrics.getExportApiInvocations()).thenReturn(exportApiInvocations); + when(exportPartition.getTableArn()).thenReturn(tableArn); + when(exportPartition.getExportTime()).thenReturn(exportTime); + + ExportProgressState state = new ExportProgressState(); + state.setBucket(bucketName); + state.setPrefix(prefix); + when(exportPartition.getProgressState()).thenReturn(Optional.of(state)); + + given(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).willReturn(exportJobSuccess); + given(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).willReturn(exportJobErrors); + given(pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT)).willReturn(exportFilesTotal); + given(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).willReturn(exportRecordsTotal); + + given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.of(exportPartition)).willReturn(Optional.empty()); + + when(dynamoDBClient.exportTableToPointInTime(any(ExportTableToPointInTimeRequest.class))).thenThrow(InternalServerErrorException.class); + //when(dynamoDBClient.describeExport(any(DescribeExportRequest.class))).thenThrow(InternalServerErrorException.class); + + scheduler = new ExportScheduler(coordinator, dynamoDBClient, manifestFileReader, pluginMetrics, dynamoDBSourceAggregateMetrics); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(scheduler); + + Thread.sleep(500); + + verify(exportApiInvocations).increment(); + verify(export5xxErrors).increment(); + verify(exportJobErrors).increment(); + + executor.shutdownNow(); + } } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManagerTest.java index d793fde4e3..24fcad623e 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManagerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManagerTest.java @@ -5,13 +5,16 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.leader; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange; import software.amazon.awssdk.services.dynamodb.model.Shard; import software.amazon.awssdk.services.dynamodb.model.StreamDescription; @@ -27,6 +30,8 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -34,6 +39,15 @@ class ShardManagerTest { @Mock private DynamoDbStreamsClient dynamoDbStreamsClient; + @Mock + private DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; + + @Mock + private Counter stream5xxErrors; + + @Mock + private Counter streamApiInvocations; + private ShardManager shardManager; @@ -80,8 +94,9 @@ void setup() { .build(); lenient().when(dynamoDbStreamsClient.describeStream(any(DescribeStreamRequest.class))).thenReturn(response); - shardManager = new ShardManager(dynamoDbStreamsClient); + shardManager = new ShardManager(dynamoDbStreamsClient, dynamoDBSourceAggregateMetrics); + when(dynamoDBSourceAggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations); } @Test @@ -100,6 +115,21 @@ void test_getChildShardIds_should_return_child_shards() { List<String> childShardIds3 = shardManager.findChildShardIds(streamArn, "shardId-005"); assertThat(childShardIds3, nullValue()); + + verify(streamApiInvocations).increment(); + } + + @Test + void stream5xxError_is_incremented_when_describe_stream_throws_internal_error() { + when(dynamoDbStreamsClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(InternalServerErrorException.class); + when(dynamoDBSourceAggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); + + final List<Shard> shards = shardManager.runDiscovery(streamArn); + assertThat(shards, notNullValue()); + assertThat(shards.isEmpty(), equalTo(true)); + + verify(stream5xxErrors).increment(); + verify(streamApiInvocations).increment(); } } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java index 7c7c11337d..00331799f0 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -19,8 +20,10 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import java.time.Instant; @@ -29,10 +32,13 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class ShardConsumerFactoryTest { @@ -46,6 +52,11 @@ class ShardConsumerFactoryTest { @Mock private PluginMetrics pluginMetrics; + @Mock + private DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; + + @Mock + private Counter streamApiInvocations; private StreamPartition streamPartition; @@ -88,6 +99,7 @@ void setup() { .build(); lenient().when(tableInfoGlobalState.getProgressState()).thenReturn(Optional.of(metadata.toMap())); + when(dynamoDBSourceAggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations); } @Test @@ -98,11 +110,12 @@ public void test_create_shardConsumer_correctly() { state.setStartTime(Instant.now().toEpochMilli()); streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); - ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer); + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); assertThat(consumer, notNullValue()); verify(dynamoDbStreamsClient).getShardIterator(any(GetShardIteratorRequest.class)); + verify(streamApiInvocations).increment(); } @Test @@ -114,12 +127,32 @@ public void test_create_shardConsumer_for_closedShards() { state.setEndingSequenceNumber(UUID.randomUUID().toString()); streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); - ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer); + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); assertThat(consumer, notNullValue()); // Should get iterators twice verify(dynamoDbStreamsClient, times(2)).getShardIterator(any(GetShardIteratorRequest.class)); + verify(streamApiInvocations, times(2)).increment(); + + } + + @Test + void stream5xxErrors_is_incremented_when_get_shard_iterator_throws_internal_exception() { + StreamProgressState state = new StreamProgressState(); + state.setWaitForExport(false); + state.setStartTime(Instant.now().toEpochMilli()); + streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); + + when(dynamoDbStreamsClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(InternalServerErrorException.class); + final Counter stream5xxErrors = mock(Counter.class); + when(dynamoDBSourceAggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); + + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); + Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); + assertThat(consumer, nullValue()); + verify(stream5xxErrors).increment(); + verify(streamApiInvocations).increment(); } } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index ff08d95f70..34af1cf978 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -25,9 +25,11 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import software.amazon.awssdk.services.dynamodb.model.OperationType; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.awssdk.services.dynamodb.model.StreamRecord; @@ -42,11 +44,11 @@ import java.util.Random; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; @@ -69,6 +71,15 @@ class ShardConsumerTest { @Mock private PluginMetrics pluginMetrics; + @Mock + private DynamoDBSourceAggregateMetrics aggregateMetrics; + + @Mock + private Counter stream5xxErrors; + + @Mock + private Counter streamApiInvocations; + @Mock private Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer; @@ -134,8 +145,8 @@ void setup() throws Exception { lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class), eq(null)); lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); - doNothing().when(bufferAccumulator).add(any(org.opensearch.dataprepper.model.record.Record.class)); - doNothing().when(bufferAccumulator).flush(); + lenient().doNothing().when(bufferAccumulator).add(any(org.opensearch.dataprepper.model.record.Record.class)); + lenient().doNothing().when(bufferAccumulator).flush(); checkpointer = new StreamCheckpointer(coordinator, streamPartition); @@ -144,10 +155,12 @@ void setup() throws Exception { .records(records) .nextShardIterator(null) .build(); - when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenReturn(response); + lenient().when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenReturn(response); given(pluginMetrics.counter(anyString())).willReturn(testCounter); given(pluginMetrics.summary(anyString())).willReturn(testSummary); + + when(aggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations); } @@ -158,7 +171,7 @@ void test_run_shardConsumer_correctly() throws Exception { final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) ) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, buffer) + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer) .shardIterator(shardIterator) .checkpointer(checkpointer) .tableInfo(tableInfo) @@ -177,6 +190,8 @@ void test_run_shardConsumer_correctly() throws Exception { verify(bufferAccumulator).flush(); // Should complete the consumer as reach to end of shard verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE)); + + verify(streamApiInvocations).increment(); } @Test @@ -189,7 +204,7 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception { final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) ) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, buffer) + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer) .shardIterator(shardIterator) .checkpointer(checkpointer) .tableInfo(tableInfo) @@ -213,6 +228,32 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception { verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE)); verify(acknowledgementSet).complete(); + + verify(streamApiInvocations).increment(); + } + + @Test + void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() { + ShardConsumer shardConsumer; + when(aggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); + try ( + final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer) + .shardIterator(shardIterator) + .checkpointer(checkpointer) + .tableInfo(tableInfo) + .startTime(null) + .waitForExport(false) + .build(); + } + + when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(InternalServerErrorException.class); + + assertThrows(RuntimeException.class, shardConsumer::run); + + verify(stream5xxErrors).increment(); + verify(streamApiInvocations).increment(); } /**