Skip to content

Commit

Permalink
Improve logging message for no shards found to indicate that export m…
Browse files Browse the repository at this point in the history
…ay still be ongoing

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Nov 16, 2023
1 parent 22647dc commit 5da233d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void start(Buffer<Record<Event>> buffer) {
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator());
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null));
// leader scheduler will handle the initialization
Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,26 @@ public class BackoffCalculator {
static final Duration MAX_JITTER = Duration.ofSeconds(2);
static final Duration MIN_JITTER = Duration.ofSeconds(-2);

private final boolean isExportConfigured;



public BackoffCalculator(final boolean isExportConfigured) {
this.isExportConfigured = isExportConfigured;
}

public long calculateBackoffToAcquireNextShard(final int noAvailableShardCount, final AtomicInteger shardsAcquired) {

// When no shards are available to process we backoff exponentially based on how many consecutive attempts have been made without getting a shard
// This limits calls to the coordination store
if (noAvailableShardCount > 0) {
if (noAvailableShardCount % 10 == 0) {
LOG.info("No shards acquired after {} attempts", noAvailableShardCount);
if (noAvailableShardCount % 50 == 0 && shardsAcquired.get() == 0) {
String errorMessage = String.format("No new shards acquired after %s attempts.", noAvailableShardCount);

if (isExportConfigured) {
errorMessage += " It is possible that the export is still in progress. New shards will not be consumed until the export is fully processed.";
}
LOG.info(errorMessage);
}

final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void calculateBackoffToAcquireNextShardReturnsExpectedBackoffValues(
final long maxExpectedBackoff
) {

final BackoffCalculator objectUnderTest = new BackoffCalculator();
final BackoffCalculator objectUnderTest = new BackoffCalculator(false);

final long backOffForShardCounts = objectUnderTest.calculateBackoffToAcquireNextShard(noAvailableShardCount, new AtomicInteger(shardsAcquiredCount));

Expand Down

0 comments on commit 5da233d

Please sign in to comment.