From 95dd099b3762699c8f1e75106c235b4d24ac8516 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 10 Nov 2023 12:56:00 -0600 Subject: [PATCH] =?UTF-8?q?Add=20metric=20for=20shards=20actively=20being?= =?UTF-8?q?=20processed,=20lower=20ownership=20timeo=E2=80=A6=20(#3629)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add metric for shards actively being processed, lower ownership timeout from 10 minutes to 5 minutes for ddb source Signed-off-by: Taylor Gray --- .../source/dynamodb/export/DataFileCheckpointer.java | 4 +++- .../source/dynamodb/stream/StreamCheckpointer.java | 4 +++- .../source/dynamodb/stream/StreamScheduler.java | 10 +++++++++- .../source/dynamodb/stream/ShardConsumerTest.java | 5 +++-- .../source/dynamodb/stream/StreamSchedulerTest.java | 11 +++++++++++ 5 files changed, 29 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileCheckpointer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileCheckpointer.java index f5fdc882f3..42f5463d52 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileCheckpointer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileCheckpointer.java @@ -21,6 +21,8 @@ public class DataFileCheckpointer { private static final Logger LOG = LoggerFactory.getLogger(DataFileCheckpointer.class); + static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5); + private final EnhancedSourceCoordinator enhancedSourceCoordinator; @@ -48,7 +50,7 @@ private void setProgressState(int lineNumber) { public void checkpoint(int lineNumber) { LOG.debug("Checkpoint data file " + dataFilePartition.getKey() + " with line number " + lineNumber); setProgressState(lineNumber); - enhancedSourceCoordinator.saveProgressStateForPartition(dataFilePartition, null); + enhancedSourceCoordinator.saveProgressStateForPartition(dataFilePartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); } /** diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java index 9e3113732a..85c7c9c69c 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java @@ -22,6 +22,8 @@ public class StreamCheckpointer { private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointer.class); + static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5); + private final EnhancedSourceCoordinator coordinator; private final StreamPartition streamPartition; @@ -52,7 +54,7 @@ private void setSequenceNumber(String sequenceNumber) { public void checkpoint(String sequenceNumber) { LOG.debug("Checkpoint shard " + streamPartition.getShardId() + " with sequenceNumber " + sequenceNumber); setSequenceNumber(sequenceNumber); - coordinator.saveProgressStateForPartition(streamPartition, null); + coordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); } /** diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index ea799bfc24..660d1b82bf 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -47,6 +47,7 @@ public class StreamScheduler implements Runnable { private static final int DELAY_TO_GET_CHILD_SHARDS_MILLIS = 1_500; static final String ACTIVE_CHANGE_EVENT_CONSUMERS = "activeChangeEventConsumers"; + static final String SHARDS_IN_PROCESSING = "activeShardsInProcessing"; private final AtomicInteger numOfWorkers = new AtomicInteger(0); private final EnhancedSourceCoordinator coordinator; @@ -55,6 +56,7 @@ public class StreamScheduler implements Runnable { private final ShardManager shardManager; private final PluginMetrics pluginMetrics; private final AtomicLong activeChangeEventConsumers; + private final AtomicLong shardsInProcessing; private final AcknowledgementSetManager acknowledgementSetManager; private final DynamoDBSourceConfig dynamoDBSourceConfig; @@ -74,6 +76,7 @@ public StreamScheduler(final EnhancedSourceCoordinator coordinator, executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); activeChangeEventConsumers = pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, new AtomicLong()); + shardsInProcessing = pluginMetrics.gauge(SHARDS_IN_PROCESSING, new AtomicLong()); } private void processStreamPartition(StreamPartition streamPartition) { @@ -98,11 +101,15 @@ private void processStreamPartition(StreamPartition streamPartition) { CompletableFuture runConsumer = CompletableFuture.runAsync(shardConsumer, executor); if (acknowledgmentsEnabled) { - runConsumer.whenComplete((v, ex) -> numOfWorkers.decrementAndGet()); + runConsumer.whenComplete((v, ex) -> { + numOfWorkers.decrementAndGet(); + shardsInProcessing.decrementAndGet(); + }); } else { runConsumer.whenComplete(completeConsumer(streamPartition)); } numOfWorkers.incrementAndGet(); + shardsInProcessing.incrementAndGet(); } else { // If failed to create a new consumer. coordinator.completePartition(streamPartition); @@ -153,6 +160,7 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) { return (v, ex) -> { if (!dynamoDBSourceConfig.isAcknowledgmentsEnabled()) { numOfWorkers.decrementAndGet(); + shardsInProcessing.decrementAndGet(); } if (ex == null) { LOG.info("Shard consumer for {} is completed", streamPartition.getShardId()); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index ec9df68189..4da0969933 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -54,6 +54,7 @@ import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.BUFFER_TIMEOUT; import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.DEFAULT_BUFFER_BATCH_SIZE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamCheckpointer.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE; @ExtendWith(MockitoExtension.class) class ShardConsumerTest { @@ -169,7 +170,7 @@ void test_run_shardConsumer_correctly() throws Exception { verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class)); verify(bufferAccumulator).flush(); // Should complete the consumer as reach to end of shard - verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(null)); + verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE)); } @Test @@ -203,7 +204,7 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception { verify(bufferAccumulator).flush(); // Should complete the consumer as reach to end of shard - verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(null)); + verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE)); verify(acknowledgementSet).complete(); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index 4c6dfdcd0d..149ca2eca0 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -44,6 +44,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler.ACTIVE_CHANGE_EVENT_CONSUMERS; +import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler.SHARDS_IN_PROCESSING; @ExtendWith(MockitoExtension.class) class StreamSchedulerTest { @@ -79,6 +80,9 @@ class StreamSchedulerTest { @Mock private AtomicLong activeShardConsumers; + @Mock + private AtomicLong activeShardsInProcessing; + private final String tableName = UUID.randomUUID().toString(); private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName; @@ -108,6 +112,7 @@ void setup() { lenient().when(shardManager.getChildShardIds(anyString(), anyString())).thenReturn(List.of(shardId)); when(pluginMetrics.gauge(eq(ACTIVE_CHANGE_EVENT_CONSUMERS), any(AtomicLong.class))).thenReturn(activeShardConsumers); + when(pluginMetrics.gauge(eq(SHARDS_IN_PROCESSING), any(AtomicLong.class))).thenReturn(activeShardsInProcessing); } @@ -135,6 +140,9 @@ public void test_normal_run() throws InterruptedException { // Should mask the stream partition as completed. verify(coordinator).completePartition(any(StreamPartition.class)); + verify(activeShardsInProcessing).incrementAndGet(); + verify(activeShardsInProcessing).decrementAndGet(); + executorService.shutdownNow(); } @@ -174,6 +182,9 @@ public void test_normal_run_with_acknowledgments() throws InterruptedException { // Should mask the stream partition as completed. verify(coordinator).completePartition(any(StreamPartition.class)); + verify(activeShardsInProcessing).incrementAndGet(); + verify(activeShardsInProcessing).decrementAndGet(); + executorService.shutdownNow(); }