Skip to content

Commit e4f5c43

Browse files
committed
Correctly release connection after switching to Pub/Sub mode.
LettuceConnection.switchToPubSub now correctly releases its underlying connection when switching to Pub/Sub. Also, we improved safeguards to avoid using closed connections. Closes #2331
1 parent 08ed1c4 commit e4f5c43

File tree

5 files changed

+39
-21
lines changed

5 files changed

+39
-21
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -299,11 +299,18 @@ public void close() throws DataAccessException {
299299

300300
super.close();
301301

302+
JedisSubscription subscription = this.subscription;
303+
if (subscription != null) {
304+
subscription.close();
305+
this.subscription = null;
306+
}
307+
302308
// return the connection to the pool
303309
if (pool != null) {
304310
jedis.close();
305311
return;
306312
}
313+
307314
// else close the connection normally (doing the try/catch dance)
308315
Exception exc = null;
309316
try {
@@ -316,8 +323,10 @@ public void close() throws DataAccessException {
316323
} catch (Exception ex) {
317324
exc = ex;
318325
}
319-
if (exc != null)
326+
327+
if (exc != null) {
320328
throw convertJedisAccessException(exc);
329+
}
321330
}
322331

323332
@Override

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.springframework.dao.QueryTimeoutException;
6363
import org.springframework.data.redis.ExceptionTranslationStrategy;
6464
import org.springframework.data.redis.FallbackExceptionTranslationStrategy;
65+
import org.springframework.data.redis.RedisSystemException;
6566
import org.springframework.data.redis.connection.*;
6667
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
6768
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware;
@@ -351,22 +352,29 @@ public void close() throws DataAccessException {
351352

352353
isClosed = true;
353354

355+
reset();
356+
}
357+
358+
private void reset() {
359+
354360
if (asyncDedicatedConn != null) {
355361
try {
356362
if (customizedDatabaseIndex()) {
357363
potentiallySelectDatabase(defaultDbIndex);
358364
}
359365
connectionProvider.release(asyncDedicatedConn);
366+
asyncDedicatedConn = null;
360367
} catch (RuntimeException ex) {
361368
throw convertLettuceAccessException(ex);
362369
}
363370
}
364371

372+
LettuceSubscription subscription = this.subscription;
365373
if (subscription != null) {
366374
if (subscription.isAlive()) {
367375
subscription.doClose();
368376
}
369-
subscription = null;
377+
this.subscription = null;
370378
}
371379

372380
this.dbIndex = defaultDbIndex;
@@ -381,7 +389,8 @@ public boolean isClosed() {
381389
public RedisClusterAsyncCommands<byte[], byte[]> getNativeConnection() {
382390

383391
LettuceSubscription subscription = this.subscription;
384-
return (subscription != null ? subscription.getNativeConnection().async() : getAsyncConnection());
392+
return (subscription != null && subscription.isAlive() ? subscription.getNativeConnection().async()
393+
: getAsyncConnection());
385394
}
386395

387396
@Override
@@ -509,8 +518,8 @@ public List<Object> exec() {
509518
LettuceTransactionResultConverter resultConverter = new LettuceTransactionResultConverter(
510519
new LinkedList<>(txResults), exceptionConverter);
511520

512-
pipeline(newLettuceResult(exec, source -> resultConverter
513-
.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
521+
pipeline(newLettuceResult(exec,
522+
source -> resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
514523
return null;
515524
}
516525

@@ -695,7 +704,8 @@ public void setPipeliningFlushPolicy(PipeliningFlushPolicy pipeliningFlushPolicy
695704
@SuppressWarnings("unchecked")
696705
protected StatefulRedisPubSubConnection<byte[], byte[]> switchToPubSub() {
697706

698-
close();
707+
checkSubscription();
708+
reset();
699709
return connectionProvider.getConnection(StatefulRedisPubSubConnection.class);
700710
}
701711

@@ -870,6 +880,10 @@ RedisClusterCommands<byte[], byte[]> getDedicatedConnection() {
870880

871881
protected RedisClusterAsyncCommands<byte[], byte[]> getAsyncDedicatedConnection() {
872882

883+
if (isClosed()) {
884+
throw new RedisSystemException("Connection is closed", null);
885+
}
886+
873887
StatefulConnection<byte[], byte[]> connection = getOrCreateDedicatedConnection();
874888

875889
if (connection instanceof StatefulRedisConnection) {

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ protected LettuceSubscription(MessageListener listener,
6060

6161
this.connection = pubsubConnection;
6262
this.listener = new LettuceMessageListener(listener,
63-
listener instanceof SubscriptionListener ? (SubscriptionListener) listener : SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER);
63+
listener instanceof SubscriptionListener ? (SubscriptionListener) listener
64+
: SubscriptionListener.NO_OP_SUBSCRIPTION_LISTENER);
6465
this.connectionProvider = connectionProvider;
6566
this.pubsub = connection.sync();
6667
this.pubSubAsync = connection.async();
@@ -75,6 +76,10 @@ protected StatefulRedisPubSubConnection<byte[], byte[]> getNativeConnection() {
7576
@Override
7677
protected void doClose() {
7778

79+
if (!isAlive()) {
80+
return;
81+
}
82+
7883
List<CompletableFuture<?>> futures = new ArrayList<>();
7984

8085
if (!getChannels().isEmpty()) {

src/main/java/org/springframework/data/redis/connection/util/AbstractSubscription.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ protected AbstractSubscription(MessageListener listener, @Nullable byte[][] chan
100100
@Override
101101
public void close() {
102102
doClose();
103+
alive.set(false);
103104
}
104105

105106
/**
@@ -231,8 +232,7 @@ private void checkPulse() {
231232

232233
private void closeIfUnsubscribed() {
233234
if (channels.isEmpty() && patterns.isEmpty()) {
234-
alive.set(false);
235-
doClose();
235+
close();
236236
}
237237
}
238238

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java

+2-12
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@
2222
import io.lettuce.core.KqueueProvider;
2323
import io.lettuce.core.ReadFrom;
2424
import io.lettuce.core.RedisException;
25-
import io.lettuce.core.RedisFuture;
2625
import io.lettuce.core.api.async.RedisAsyncCommands;
2726
import io.lettuce.core.api.reactive.BaseRedisReactiveCommands;
2827
import reactor.test.StepVerifier;
2928

3029
import java.io.File;
3130
import java.time.Duration;
32-
import java.util.concurrent.ExecutionException;
3331
import java.util.function.Consumer;
3432

3533
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@@ -225,19 +223,11 @@ void testDisableSharedConnection() throws Exception {
225223
factory.setShareNativeConnection(false);
226224
RedisConnection conn2 = factory.getConnection();
227225
assertThat(conn2.getNativeConnection()).isNotSameAs(connection.getNativeConnection());
228-
// Give some time for native connection to asynchronously initialize, else close doesn't work
229226
Thread.sleep(100);
230227
conn2.close();
231228
assertThat(conn2.isClosed()).isTrue();
232-
// Give some time for native connection to asynchronously close
233-
Thread.sleep(100);
234-
RedisFuture<String> future = ((RedisAsyncCommands<byte[], byte[]>) conn2.getNativeConnection()).ping();
235-
try {
236-
future.get();
237-
fail("The native connection should be closed");
238-
} catch (ExecutionException e) {
239-
// expected, Lettuce async failures are signalled on the Future
240-
}
229+
230+
assertThatExceptionOfType(RedisSystemException.class).isThrownBy(conn2::getNativeConnection);
241231
}
242232

243233
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)