From 1125a15d9b1ee9cbfd1dda9e4dace33b3ee882d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9rique=20Mittelstaedt?= Date: Thu, 31 Aug 2023 13:09:19 +0100 Subject: [PATCH] enhance exponential backoff --- .../BoundedExponentialBackoff.java | 41 +++++++++++++++++-- .../flink/core/httpfn/RetryingCallback.java | 39 +++++++++++++----- .../BoundedExponentialBackoffTest.java | 2 +- 3 files changed, 67 insertions(+), 15 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java index 52622eb2c..3ab770c0d 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoff.java @@ -18,28 +18,54 @@ package org.apache.flink.statefun.flink.core.backpressure; +import static java.lang.Math.random; +import static java.lang.Math.round; + import java.time.Duration; import java.util.Objects; import org.apache.flink.annotation.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class BoundedExponentialBackoff { + private static final Logger LOG = LoggerFactory.getLogger(BoundedExponentialBackoff.class); + private final Timer timer; private final long requestStartTimeInNanos; private final long maxRequestDurationInNanos; + private final double backOffIncreaseFactor; + private final double jitter; private long nextSleepTimeNanos; - public BoundedExponentialBackoff(Duration initialBackoffDuration, Duration maxRequestDuration) { - this(SystemNanoTimer.instance(), initialBackoffDuration, maxRequestDuration); + public BoundedExponentialBackoff( + Duration initialBackoffDuration, + double backOffIncreaseFactor, + double jitter, + Duration maxRequestDuration) { + this( + SystemNanoTimer.instance(), + initialBackoffDuration, + backOffIncreaseFactor, + jitter, + maxRequestDuration); } @VisibleForTesting BoundedExponentialBackoff( - Timer timer, Duration initialBackoffDuration, Duration maxRequestDuration) { + Timer timer, + Duration initialBackoffDuration, + double backOffIncreaseFactor, + double jitter, + Duration maxRequestDuration) { this.timer = Objects.requireNonNull(timer); this.requestStartTimeInNanos = timer.now(); + this.backOffIncreaseFactor = backOffIncreaseFactor; + this.jitter = jitter; this.maxRequestDurationInNanos = maxRequestDuration.toNanos(); this.nextSleepTimeNanos = initialBackoffDuration.toNanos(); + this.nextSleepTimeNanos = + round(initialBackoffDuration.toNanos() * (1.0 + jitter * (-1.0 + 2.0 * random()))); } public boolean applyNow() { @@ -49,6 +75,12 @@ public boolean applyNow() { if (actualSleep <= 0) { return false; } + + LOG.info( + String.format( + "Applying exponential backoff. Sleeping for %f seconds.", + actualSleep * Math.pow(10, -9))); + timer.sleep(actualSleep); return true; } @@ -60,7 +92,8 @@ private long remainingNanosUntilDeadLine() { private long nextAmountOfNanosToSleep() { final long current = nextSleepTimeNanos; - nextSleepTimeNanos *= 2; + nextSleepTimeNanos = + round(nextSleepTimeNanos * backOffIncreaseFactor * (1 + jitter * (-1 + 2 * random()))); return current; } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java index ffb7f7bb7..a6bbb9ca1 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java @@ -40,10 +40,11 @@ @SuppressWarnings("NullableProblems") final class RetryingCallback implements Callback { - private static final Duration INITIAL_BACKOFF_DURATION = Duration.ofMillis(10); + private static final Duration INITIAL_BACKOFF_DURATION = Duration.ofMillis(1000); + private static final double BACKOFF_JITTER = 0.1; + private static final double BACKOFF_INCREASE_FACTOR = 2; - private static final Set RETRYABLE_HTTP_CODES = - new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500)); + private static final Set RETRYABLE_HTTP_CODES = new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500)); private static final Logger LOG = LoggerFactory.getLogger(RetryingCallback.class); @@ -61,7 +62,8 @@ final class RetryingCallback implements Callback { Timeout timeout, BooleanSupplier isShutdown) { this.resultFuture = new CompletableFuture<>(); - this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout)); + this.backoff = new BoundedExponentialBackoff( + INITIAL_BACKOFF_DURATION, BACKOFF_INCREASE_FACTOR, BACKOFF_JITTER, duration(timeout)); this.requestSummary = requestSummary; this.metrics = metrics; this.isShutdown = Objects.requireNonNull(isShutdown); @@ -90,13 +92,22 @@ private void onFailureUnsafe(Call call, IOException cause) { if (isShutdown.getAsBoolean()) { throw new IllegalStateException("An exception caught during shutdown.", cause); } + + final double callDurationSeconds = timeSinceRequestStartedInNanoseconds() / Math.pow(10, 9); LOG.warn( - "Retriable exception caught while trying to deliver a message: " + requestSummary, cause); + "Retriable exception caught after " + + callDurationSeconds + + " seconds while trying to deliver a message: " + + requestSummary, + cause); metrics.remoteInvocationFailures(); if (!retryAfterApplyingBackoff(call)) { throw new IllegalStateException( - "Maximal request time has elapsed. Last cause is attached", cause); + "Maximal request time has elapsed after " + + callDurationSeconds + + " seconds. Last cause is attached", + cause); } } @@ -109,9 +120,12 @@ private void onResponseUnsafe(Call call, Response response) { throw new IllegalStateException("Non successful HTTP response code " + response.code()); } if (!retryAfterApplyingBackoff(call)) { + final double callDurationSeconds = timeSinceRequestStartedInNanoseconds() / Math.pow(10, 9); + throw new IllegalStateException( - "Maximal request time has elapsed. Last known error is: invalid HTTP response code " - + response.code()); + String.format( + "Maximal request time has elapsed after %f seconds. Last known error is: invalid HTTP response code %d", + callDurationSeconds, response.code())); } } @@ -130,7 +144,8 @@ private boolean retryAfterApplyingBackoff(Call call) { } /** - * Executes the runnable, and completes {@link #resultFuture} with any exceptions thrown, during + * Executes the runnable, and completes {@link #resultFuture} with any + * exceptions thrown, during * its execution. */ private void tryWithFuture(RunnableWithException runnable) { @@ -147,8 +162,12 @@ private static Duration duration(Timeout timeout) { } private void endTimingRequest() { - final long nanosecondsElapsed = System.nanoTime() - requestStarted; + final long nanosecondsElapsed = this.timeSinceRequestStartedInNanoseconds(); final long millisecondsElapsed = TimeUnit.NANOSECONDS.toMillis(nanosecondsElapsed); metrics.remoteInvocationLatency(millisecondsElapsed); } + + private long timeSinceRequestStartedInNanoseconds() { + return System.nanoTime() - requestStarted; + } } diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java index 6b0eed582..16d63fdaa 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/backpressure/BoundedExponentialBackoffTest.java @@ -27,7 +27,7 @@ public class BoundedExponentialBackoffTest { private final FakeNanoClock fakeTime = new FakeNanoClock(); private final BoundedExponentialBackoff backoffUnderTest = - new BoundedExponentialBackoff(fakeTime, Duration.ofSeconds(1), Duration.ofMinutes(1)); + new BoundedExponentialBackoff(fakeTime, Duration.ofSeconds(1), 2, 0.1, Duration.ofMinutes(1)); @Test public void simpleUsage() {