From 5da233db9380e3faa2663517b0be245c35b2a5d6 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 16 Nov 2023 13:35:07 -0600 Subject: [PATCH 1/2] Improve logging message for no shards found to indicate that export may still be ongoing Signed-off-by: Taylor Gray --- .../source/dynamodb/DynamoDBService.java | 2 +- .../dynamodb/utils/BackoffCalculator.java | 17 +++++++++++++++-- .../dynamodb/utils/BackoffCalculatorTest.java | 2 +- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 45b51ac4f2..99ea8fa310 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -95,7 +95,7 @@ public void start(Buffer> buffer) { Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer); - Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator()); + Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null)); // leader scheduler will handle the initialization Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java index 2065310665..a3b7f17823 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java @@ -30,13 +30,26 @@ public class BackoffCalculator { static final Duration MAX_JITTER = Duration.ofSeconds(2); static final Duration MIN_JITTER = Duration.ofSeconds(-2); + private final boolean isExportConfigured; + + + + public BackoffCalculator(final boolean isExportConfigured) { + this.isExportConfigured = isExportConfigured; + } + public long calculateBackoffToAcquireNextShard(final int noAvailableShardCount, final AtomicInteger shardsAcquired) { // When no shards are available to process we backoff exponentially based on how many consecutive attempts have been made without getting a shard // This limits calls to the coordination store if (noAvailableShardCount > 0) { - if (noAvailableShardCount % 10 == 0) { - LOG.info("No shards acquired after {} attempts", noAvailableShardCount); + if (noAvailableShardCount % 50 == 0 && shardsAcquired.get() == 0) { + String errorMessage = String.format("No new shards acquired after %s attempts.", noAvailableShardCount); + + if (isExportConfigured) { + errorMessage += " It is possible that the export is still in progress. New shards will not be consumed until the export is fully processed."; + } + LOG.info(errorMessage); } final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1)); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java index cfbbeaedca..eaa1475d7b 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java @@ -27,7 +27,7 @@ void calculateBackoffToAcquireNextShardReturnsExpectedBackoffValues( final long maxExpectedBackoff ) { - final BackoffCalculator objectUnderTest = new BackoffCalculator(); + final BackoffCalculator objectUnderTest = new BackoffCalculator(false); final long backOffForShardCounts = objectUnderTest.calculateBackoffToAcquireNextShard(noAvailableShardCount, new AtomicInteger(shardsAcquiredCount)); From 6b7b209b9bbae30e66580de857bcc5dced3e1061 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 16 Nov 2023 15:00:38 -0600 Subject: [PATCH 2/2] Fix overflow when there are no available shards for many retries Signed-off-by: Taylor Gray --- .../plugins/source/dynamodb/utils/BackoffCalculator.java | 6 +++--- .../source/dynamodb/utils/BackoffCalculatorTest.java | 3 ++- .../plugins/source/opensearch/worker/WorkerCommonUtils.java | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java index a3b7f17823..41d9f43fb6 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java @@ -44,16 +44,16 @@ public long calculateBackoffToAcquireNextShard(final int noAvailableShardCount, // This limits calls to the coordination store if (noAvailableShardCount > 0) { if (noAvailableShardCount % 50 == 0 && shardsAcquired.get() == 0) { - String errorMessage = String.format("No new shards acquired after %s attempts.", noAvailableShardCount); + String errorMessage = String.format("No new shards acquired after %s attempts. This means that all shards are currently being consumed", noAvailableShardCount); if (isExportConfigured) { - errorMessage += " It is possible that the export is still in progress. New shards will not be consumed until the export is fully processed."; + errorMessage += ", or that the export is still in progress. New shards will not be consumed until the export is fully processed."; } LOG.info(errorMessage); } final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1)); - return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, noAvailableShardCount - 1) + jitterMillis, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis())); + return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, (int) min(noAvailableShardCount - 1, 8)) + jitterMillis, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis())); } // When shards are being acquired we backoff linearly based on how many shards this node is actively processing, to encourage a fast start but still a balance of shards between nodes diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java index eaa1475d7b..6ab787708c 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java @@ -47,7 +47,8 @@ private static Stream countsToExpectedBackoffRange() { Arguments.of(4, 2, 2_000, 6_000), Arguments.of(5, 6, 6_000, 10_000), Arguments.of(6, 6, 14_000, 15_000), - Arguments.of(8, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()) + Arguments.of(8, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()), + Arguments.of(70, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()) ); } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java index 5e490f1ff1..2e0f98ee42 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java @@ -76,6 +76,6 @@ static void completeIndexPartition(final OpenSearchSourceConfiguration openSearc static long calculateExponentialBackoffAndJitter(final int retryCount) { final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1)); - return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, retryCount - 1) + jitterMillis, MAX_BACKOFF.toMillis())); + return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, (int) min(retryCount - 1, 10)) + jitterMillis, MAX_BACKOFF.toMillis())); } }