Skip to content

Commit

Permalink
Fix overflow when there are no available shards for many retries
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Nov 16, 2023
1 parent 5da233d commit 6b7b209
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ public long calculateBackoffToAcquireNextShard(final int noAvailableShardCount,
// This limits calls to the coordination store
if (noAvailableShardCount > 0) {
if (noAvailableShardCount % 50 == 0 && shardsAcquired.get() == 0) {
String errorMessage = String.format("No new shards acquired after %s attempts.", noAvailableShardCount);
String errorMessage = String.format("No new shards acquired after %s attempts. This means that all shards are currently being consumed", noAvailableShardCount);

if (isExportConfigured) {
errorMessage += " It is possible that the export is still in progress. New shards will not be consumed until the export is fully processed.";
errorMessage += ", or that the export is still in progress. New shards will not be consumed until the export is fully processed.";
}
LOG.info(errorMessage);
}

final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1));
return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, noAvailableShardCount - 1) + jitterMillis, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()));
return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, (int) min(noAvailableShardCount - 1, 8)) + jitterMillis, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()));
}

// When shards are being acquired we backoff linearly based on how many shards this node is actively processing, to encourage a fast start but still a balance of shards between nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private static Stream<Arguments> countsToExpectedBackoffRange() {
Arguments.of(4, 2, 2_000, 6_000),
Arguments.of(5, 6, 6_000, 10_000),
Arguments.of(6, 6, 14_000, 15_000),
Arguments.of(8, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis())
Arguments.of(8, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()),
Arguments.of(70, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ static void completeIndexPartition(final OpenSearchSourceConfiguration openSearc

static long calculateExponentialBackoffAndJitter(final int retryCount) {
final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1));
return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, retryCount - 1) + jitterMillis, MAX_BACKOFF.toMillis()));
return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, (int) min(retryCount - 1, 10)) + jitterMillis, MAX_BACKOFF.toMillis()));
}
}

0 comments on commit 6b7b209

Please sign in to comment.