From 5da233db9380e3faa2663517b0be245c35b2a5d6 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 16 Nov 2023 13:35:07 -0600 Subject: [PATCH] Improve logging message for no shards found to indicate that export may still be ongoing Signed-off-by: Taylor Gray --- .../source/dynamodb/DynamoDBService.java | 2 +- .../dynamodb/utils/BackoffCalculator.java | 17 +++++++++++++++-- .../dynamodb/utils/BackoffCalculatorTest.java | 2 +- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 45b51ac4f2..99ea8fa310 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -95,7 +95,7 @@ public void start(Buffer> buffer) { Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer); - Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator()); + Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null)); // leader scheduler will handle the initialization Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java index 2065310665..a3b7f17823 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java @@ -30,13 +30,26 @@ public class BackoffCalculator { static final Duration MAX_JITTER = Duration.ofSeconds(2); static final Duration MIN_JITTER = Duration.ofSeconds(-2); + private final boolean isExportConfigured; + + + + public BackoffCalculator(final boolean isExportConfigured) { + this.isExportConfigured = isExportConfigured; + } + public long calculateBackoffToAcquireNextShard(final int noAvailableShardCount, final AtomicInteger shardsAcquired) { // When no shards are available to process we backoff exponentially based on how many consecutive attempts have been made without getting a shard // This limits calls to the coordination store if (noAvailableShardCount > 0) { - if (noAvailableShardCount % 10 == 0) { - LOG.info("No shards acquired after {} attempts", noAvailableShardCount); + if (noAvailableShardCount % 50 == 0 && shardsAcquired.get() == 0) { + String errorMessage = String.format("No new shards acquired after %s attempts.", noAvailableShardCount); + + if (isExportConfigured) { + errorMessage += " It is possible that the export is still in progress. New shards will not be consumed until the export is fully processed."; + } + LOG.info(errorMessage); } final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1)); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java index cfbbeaedca..eaa1475d7b 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java @@ -27,7 +27,7 @@ void calculateBackoffToAcquireNextShardReturnsExpectedBackoffValues( final long maxExpectedBackoff ) { - final BackoffCalculator objectUnderTest = new BackoffCalculator(); + final BackoffCalculator objectUnderTest = new BackoffCalculator(false); final long backOffForShardCounts = objectUnderTest.calculateBackoffToAcquireNextShard(noAvailableShardCount, new AtomicInteger(shardsAcquiredCount));