diff --git a/.github/wordlist.txt b/.github/wordlist.txt
index b4dbd628a8..b76feed3e1 100644
--- a/.github/wordlist.txt
+++ b/.github/wordlist.txt
@@ -277,4 +277,5 @@ ACR
AMR
Entra
authx
-entraid
\ No newline at end of file
+entraid
+autoReconnect
diff --git a/docs/advanced-usage.md b/docs/advanced-usage.md
index 74f154dee4..c5ea1557c3 100644
--- a/docs/advanced-usage.md
+++ b/docs/advanced-usage.md
@@ -327,7 +327,7 @@ client.setOptions(ClientOptions.builder()
PING before activating connection |
-pingBefor eActivateConnection |
+pingBeforeActivateConnection |
true |
@@ -362,8 +362,21 @@ queued commands.
refuse commands and cancel these with an exception.
+Replay filter |
+replayFilter |
+(cmd) -> false |
+
+
+Since: 6.6
+Controls which commands are to be filtered out in case the driver
+attempts to reconnect to the server. Returning false means
+that the command would not be filtered out.
+This flag has no effect in case the autoReconnect feature is not
+enabled. |
+
+
Cancel commands on reconnect failure |
-cancelCommand sOnReconnectFailure |
+cancelCommandsOnReconnectFailure |
false |
@@ -486,7 +499,7 @@ store/trust store.
Timeout Options |
timeoutOptions |
-Do n ot timeout commands. |
+Do not timeout commands. |
Since: 5.1
@@ -550,7 +563,7 @@ client.setOptions(ClusterClientOptions.builder()
|
Periodic cluster topology refresh |
-en ablePeriodicRefresh |
+enablePeriodicRefresh |
false |
@@ -2399,14 +2412,14 @@ independent connections to Redis.
Lettuce provides two levels of consistency; these are the rules for
Redis command sends:
-Depending on the chosen consistency level:
+#### Depending on the chosen consistency level
-- **at-most-once execution**, i. e. no guaranteed execution
+- **at-most-once execution**, i.e. no guaranteed execution
-- **at-least-once execution**, i. e. guaranteed execution (with [some
+- **at-least-once execution**, i.e. guaranteed execution (with [some
exceptions](#exceptions-to-at-least-once))
-Always:
+#### Always
- command ordering in the order of invocations
@@ -2602,9 +2615,44 @@ re-established, queued commands are re-sent for execution. While a
connection failure persists, issued commands are buffered.
To change into *at-most-once* consistency level, disable auto-reconnect
-mode. Connections cannot be longer reconnected and thus no retries are
-issued. Not successfully commands are canceled. New commands are
-rejected.
+mode. Connections can no longer be reconnected and thus no retries are
+issued. Unsuccessful commands are canceled. New commands are rejected.
+
+#### Controlling replay of commands in *at-lease-once* mode
+
+!!! NOTE
+ This feature is only available since Lettuce 6.6
+
+One can achieve a more fine-grained control over the commands that are
+replayed after a reconnection by using the option to specify a filter
+predicate. This option is part of the ClientOptions configuration. See
+[Client Options](advanced-usage.md#client-options) for further reference.
+
+``` java
+Predicate > filter = cmd ->
+ cmd.getType().toString().equalsIgnoreCase("DECR");
+
+client.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .replayFilter(filter)
+ .build());
+```
+
+The code above would filter out all `DECR` commands from being replayed
+after a reconnection. Another, perhaps more popular example, would be:
+
+``` java
+Predicate > filter = cmd -> true;
+
+client.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .replayFilter(filter)
+ .build());
+```
+
+... which disables any command replay, but still allows the driver to
+re-connect, basically providing a way to have auto-reconnect without
+auto-replay of commands.
### Clustered operations
diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java
index 5c02c3b18c..a16845b00c 100644
--- a/src/main/java/io/lettuce/core/ClientOptions.java
+++ b/src/main/java/io/lettuce/core/ClientOptions.java
@@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import io.lettuce.core.api.StatefulConnection;
@@ -35,6 +36,7 @@
import io.lettuce.core.protocol.DecodeBufferPolicy;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.protocol.ReadOnlyCommands;
+import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.resource.ClientResources;
import reactor.core.publisher.Mono;
@@ -50,6 +52,8 @@ public class ClientOptions implements Serializable {
public static final boolean DEFAULT_AUTO_RECONNECT = true;
+ public static final Predicate> DEFAULT_REPLAY_FILTER = (cmd) -> false;
+
public static final int DEFAULT_BUFFER_USAGE_RATIO = 3;
public static final boolean DEFAULT_CANCEL_CMD_RECONNECT_FAIL = false;
@@ -92,6 +96,8 @@ public class ClientOptions implements Serializable {
private final boolean autoReconnect;
+ private final Predicate> replayFilter;
+
private final boolean cancelCommandsOnReconnectFailure;
private final DecodeBufferPolicy decodeBufferPolicy;
@@ -126,6 +132,7 @@ public class ClientOptions implements Serializable {
protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
+ this.replayFilter = builder.replayFilter;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
this.disconnectedBehavior = builder.disconnectedBehavior;
@@ -146,6 +153,7 @@ protected ClientOptions(Builder builder) {
protected ClientOptions(ClientOptions original) {
this.autoReconnect = original.isAutoReconnect();
+ this.replayFilter = original.getReplayFilter();
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
this.disconnectedBehavior = original.getDisconnectedBehavior();
@@ -199,6 +207,8 @@ public static class Builder {
private boolean autoReconnect = DEFAULT_AUTO_RECONNECT;
+ private Predicate> replayFilter = DEFAULT_REPLAY_FILTER;
+
private boolean cancelCommandsOnReconnectFailure = DEFAULT_CANCEL_CMD_RECONNECT_FAIL;
private DecodeBufferPolicy decodeBufferPolicy = DecodeBufferPolicies.ratio(DEFAULT_BUFFER_USAGE_RATIO);
@@ -246,6 +256,21 @@ public Builder autoReconnect(boolean autoReconnect) {
return this;
}
+ /**
+ * When {@link #autoReconnect(boolean)} is set to true, this {@link Predicate} is used to filter commands to replay when
+ * the connection is reestablished after a disconnect. Returning false
means the command will not be
+ * filtered out and will be replayed. Defaults to replaying all queued commands.
+ *
+ * @param replayFilter a {@link Predicate} to filter commands to replay. Must not be {@code null}.
+ * @see #DEFAULT_REPLAY_FILTER
+ * @return {@code this}
+ * @since 6.6
+ */
+ public Builder replayFilter(Predicate> replayFilter) {
+ this.replayFilter = replayFilter;
+ return this;
+ }
+
/**
* Allows cancelling queued commands in case a reconnect fails.Defaults to {@code false}. See
* {@link #DEFAULT_CANCEL_CMD_RECONNECT_FAIL}. This flag is deprecated and should not be used as it can lead to race
@@ -527,13 +552,13 @@ public ClientOptions.Builder mutate() {
Builder builder = new Builder();
builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
- .decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
- .reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
- .publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
- .protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
- .scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
- .sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
- .timeoutOptions(getTimeoutOptions());
+ .replayFilter(getReplayFilter()).decodeBufferPolicy(getDecodeBufferPolicy())
+ .disconnectedBehavior(getDisconnectedBehavior()).reauthenticateBehavior(getReauthenticateBehaviour())
+ .readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
+ .pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
+ .requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
+ .socketOptions(getSocketOptions()).sslOptions(getSslOptions())
+ .suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
return builder;
}
@@ -551,6 +576,16 @@ public boolean isAutoReconnect() {
return autoReconnect;
}
+ /**
+ * Controls which {@link RedisCommand} will be replayed after a re-connect. The {@link Predicate} returns true
+ * if command should be filtered out and not replayed. Defaults to {@link #DEFAULT_REPLAY_FILTER}.
+ *
+ * @return the currently set {@link Predicate} used to filter out commands to replay
+ */
+ public Predicate> getReplayFilter() {
+ return replayFilter;
+ }
+
/**
* If this flag is {@code true} any queued commands will be canceled when a reconnect fails within the activation sequence.
* Default is {@code false}.
diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
index 502ce4a767..79f2f05f16 100644
--- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
+++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
@@ -33,6 +33,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import io.lettuce.core.ClientOptions;
@@ -81,6 +82,8 @@ public class DefaultEndpoint implements RedisChannelWriter, Endpoint, PushHandle
private final Reliability reliability;
+ private final Predicate> replayFilter;
+
private final ClientOptions clientOptions;
private final ClientResources clientResources;
@@ -139,6 +142,7 @@ public DefaultEndpoint(ClientOptions clientOptions, ClientResources clientResour
this.clientOptions = clientOptions;
this.clientResources = clientResources;
this.reliability = clientOptions.isAutoReconnect() ? Reliability.AT_LEAST_ONCE : Reliability.AT_MOST_ONCE;
+ this.replayFilter = clientOptions.getReplayFilter();
this.disconnectedBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
this.commandBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
@@ -343,6 +347,13 @@ private void writeToDisconnectedBuffer(RedisCommand, ?, ?> command) {
return;
}
+ if (replayFilter.test(command)) {
+ if (debugEnabled) {
+ logger.debug("{} writeToDisconnectedBuffer() Filtering out command {}", logPrefix(), command);
+ }
+ return;
+ }
+
if (debugEnabled) {
logger.debug("{} writeToDisconnectedBuffer() buffering (disconnected) command {}", logPrefix(), command);
}
@@ -1033,10 +1044,16 @@ private void doComplete(Future future) {
private void potentiallyRequeueCommands(Channel channel, RedisCommand, ?, ?> sentCommand,
Collection extends RedisCommand, ?, ?>> sentCommands) {
+ // do not requeue commands that are done
if (sentCommand != null && sentCommand.isDone()) {
return;
}
+ // do not requeue commands that are to be filtered out
+ if (this.endpoint.replayFilter.test(sentCommand)) {
+ return;
+ }
+
if (sentCommands != null) {
boolean foundToSend = false;
diff --git a/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java
index 12b2cd66c5..c11c28c30a 100644
--- a/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java
+++ b/src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java
@@ -56,6 +56,7 @@ class ClusterNodeEndpointUnitTests {
@BeforeEach
void before() {
+ when(clientOptions.getReplayFilter()).thenReturn((cmd) -> false);
when(clientOptions.getRequestQueueSize()).thenReturn(1000);
when(clientOptions.getDisconnectedBehavior()).thenReturn(ClientOptions.DisconnectedBehavior.DEFAULT);
diff --git a/src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java b/src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java
index 7dbac3bf99..df420ce7d1 100644
--- a/src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java
+++ b/src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java
@@ -8,8 +8,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import io.lettuce.core.TimeoutOptions;
+import io.lettuce.core.protocol.RedisCommand;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -372,6 +374,54 @@ void retryAfterConnectionIsDisconnected() throws Exception {
verificationConnection.getStatefulConnection().close();
}
+ @Test
+ void retryAfterConnectionIsDisconnectedButFiltered() throws Exception {
+ // Do not replay DECR commands after reconnect for some reason
+ Predicate> filter = cmd -> cmd.getType().toString().equalsIgnoreCase("DECR");
+
+ client.setOptions(ClientOptions.builder().autoReconnect(true).replayFilter(filter)
+ .timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build());
+
+ // needs to be increased on slow systems...perhaps...
+ client.setDefaultTimeout(3, TimeUnit.SECONDS);
+
+ StatefulRedisConnection connection = client.connect();
+ RedisCommands verificationConnection = client.connect().sync();
+
+ connection.sync().set(key, "1");
+
+ ConnectionWatchdog connectionWatchdog = ConnectionTestUtil.getConnectionWatchdog(connection);
+ connectionWatchdog.setListenOnChannelInactive(false);
+
+ connection.async().quit();
+ while (connection.isOpen()) {
+ Delay.delay(Duration.ofMillis(100));
+ }
+
+ assertThat(connection.async().incr(key).await(1, TimeUnit.SECONDS)).isFalse();
+ assertThat(connection.async().decr(key).await(1, TimeUnit.SECONDS)).isFalse();
+ assertThat(connection.async().decr(key).await(1, TimeUnit.SECONDS)).isFalse();
+
+ assertThat(verificationConnection.get("key")).isEqualTo("1");
+
+ assertThat(ConnectionTestUtil.getDisconnectedBuffer(connection).size()).isGreaterThan(0);
+ assertThat(ConnectionTestUtil.getCommandBuffer(connection)).isEmpty();
+
+ connectionWatchdog.setListenOnChannelInactive(true);
+ connectionWatchdog.scheduleReconnect();
+
+ while (!ConnectionTestUtil.getCommandBuffer(connection).isEmpty()
+ || !ConnectionTestUtil.getDisconnectedBuffer(connection).isEmpty()) {
+ Delay.delay(Duration.ofMillis(10));
+ }
+
+ assertThat(connection.sync().get(key)).isEqualTo("2");
+ assertThat(verificationConnection.get(key)).isEqualTo("2");
+
+ connection.close();
+ verificationConnection.getStatefulConnection().close();
+ }
+
private Throwable getException(RedisFuture> command) {
try {
command.get();