Skip to content

Commit

Permalink
Replacing all instances of synchronized blocks with ReentrantLock (#3116
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tishun authored Jan 6, 2025
1 parent 38a30dc commit 1448b2b
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -63,6 +65,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Tihomir Mateev
* @since 3.0
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
Expand All @@ -72,7 +75,7 @@ class PooledClusterConnectionProvider<K, V>
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledClusterConnectionProvider.class);

// Contains NodeId-identified and HostAndPort-identified connections.
private final Object stateLock = new Object();
private final Lock stateLock = new ReentrantLock();

private final boolean debugEnabled = logger.isDebugEnabled();

Expand Down Expand Up @@ -156,8 +159,12 @@ public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Conne
private CompletableFuture<StatefulRedisConnection<K, V>> getWriteConnection(int slot) {

CompletableFuture<StatefulRedisConnection<K, V>> writer;// avoid races when reconfiguring partitions.
synchronized (stateLock) {

stateLock.lock();
try {
writer = writers[slot];
} finally {
stateLock.unlock();
}

if (writer == null) {
Expand All @@ -177,10 +184,13 @@ private CompletableFuture<StatefulRedisConnection<K, V>> getWriteConnection(int

return future.thenApply(connection -> {

synchronized (stateLock) {
stateLock.lock();
try {
if (writers[slot] == null) {
writers[slot] = CompletableFuture.completedFuture(connection);
}
} finally {
stateLock.unlock();
}

return connection;
Expand All @@ -196,8 +206,11 @@ private CompletableFuture<StatefulRedisConnection<K, V>> getReadConnection(int s

boolean cached = true;

synchronized (stateLock) {
stateLock.lock();
try {
readerCandidates = readers[slot];
} finally {
stateLock.unlock();
}

if (readerCandidates == null) {
Expand Down Expand Up @@ -293,8 +306,12 @@ public Iterator<RedisNodeDescription> iterator() {
for (int i = 0; i < toCache.length; i++) {
toCache[i] = CompletableFuture.completedFuture(statefulRedisConnections[i]);
}
synchronized (stateLock) {

stateLock.lock();
try {
readers[slot] = toCache;
} finally {
stateLock.unlock();
}

if (!orderSensitive) {
Expand Down Expand Up @@ -532,12 +549,15 @@ public void setPartitions(Partitions partitions) {

boolean reconfigurePartitions = false;

synchronized (stateLock) {
stateLock.lock();
try {
if (this.partitions != null) {
reconfigurePartitions = true;
}
this.partitions = partitions;
this.connectionFactory.setPartitions(partitions);
} finally {
stateLock.unlock();
}

if (reconfigurePartitions) {
Expand Down Expand Up @@ -601,8 +621,11 @@ private boolean isStale(ConnectionKey connectionKey) {
@Override
public void setAutoFlushCommands(boolean autoFlush) {

synchronized (stateLock) {
stateLock.lock();
try {
this.autoFlushCommands = autoFlush;
} finally {
stateLock.unlock();
}

connectionProvider.forEach(connection -> connection.setAutoFlushCommands(autoFlush));
Expand All @@ -616,9 +639,12 @@ public void flushCommands() {
@Override
public void setReadFrom(ReadFrom readFrom) {

synchronized (stateLock) {
stateLock.lock();
try {
this.readFrom = readFrom;
Arrays.fill(readers, null);
} finally {
stateLock.unlock();
}
}

Expand All @@ -643,9 +669,12 @@ long getConnectionCount() {
*/
private void resetFastConnectionCache() {

synchronized (stateLock) {
stateLock.lock();
try {
Arrays.fill(writers, null);
Arrays.fill(readers, null);
} finally {
stateLock.unlock();
}
}

Expand Down Expand Up @@ -719,9 +748,12 @@ public ConnectionFuture<StatefulRedisConnection<K, V>> apply(ConnectionKey key)

RedisClusterNode actualNode = targetNode;
connection = connection.thenApply(c -> {
synchronized (stateLock) {
stateLock.lock();
try {
c.setAutoFlushCommands(autoFlushCommands);
c.addListener(message -> onPushMessage(actualNode, message));
} finally {
stateLock.unlock();
}
return c;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.SlotHash;
Expand Down Expand Up @@ -62,6 +64,8 @@ public class Partitions implements Collection<RedisClusterNode> {

private static final RedisClusterNode[] EMPTY = new RedisClusterNode[SlotHash.SLOT_COUNT];

private final Lock lock = new ReentrantLock();

private final List<RedisClusterNode> partitions = new ArrayList<>();

private volatile RedisClusterNode[] slotCache = EMPTY;
Expand Down Expand Up @@ -166,8 +170,8 @@ private static boolean matches(RedisURI uri, String host, int port) {
*/
public void updateCache() {

synchronized (partitions) {

lock.lock();
try {
if (partitions.isEmpty()) {
invalidateCache();
return;
Expand All @@ -190,6 +194,8 @@ public void updateCache() {
this.slotCache = slotCache;
this.masterCache = masterCache;
this.nodeReadView = Collections.unmodifiableCollection(readView);
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -232,9 +238,12 @@ public void addPartition(RedisClusterNode partition) {

LettuceAssert.notNull(partition, "Partition must not be null");

synchronized (partitions) {
lock.lock();
try {
invalidateCache();
partitions.add(partition);
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -265,10 +274,13 @@ public void reload(List<RedisClusterNode> partitions) {

LettuceAssert.noNullElements(partitions, "Partitions must not contain null elements");

synchronized (this.partitions) {
lock.lock();
try {
this.partitions.clear();
this.partitions.addAll(partitions);
updateCache();
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -304,10 +316,13 @@ public boolean addAll(Collection<? extends RedisClusterNode> c) {

LettuceAssert.noNullElements(c, "Partitions must not contain null elements");

synchronized (partitions) {
lock.lock();
try {
boolean b = partitions.addAll(c);
updateCache();
return b;
} finally {
lock.unlock();
}
}

Expand All @@ -321,10 +336,13 @@ public boolean addAll(Collection<? extends RedisClusterNode> c) {
@Override
public boolean removeAll(Collection<?> c) {

synchronized (partitions) {
lock.lock();
try {
boolean b = getPartitions().removeAll(c);
updateCache();
return b;
} finally {
lock.unlock();
}
}

Expand All @@ -339,10 +357,13 @@ public boolean removeAll(Collection<?> c) {
@Override
public boolean retainAll(Collection<?> c) {

synchronized (partitions) {
lock.lock();
try {
boolean b = getPartitions().retainAll(c);
updateCache();
return b;
} finally {
lock.unlock();
}
}

Expand All @@ -352,9 +373,12 @@ public boolean retainAll(Collection<?> c) {
@Override
public void clear() {

synchronized (partitions) {
lock.lock();
try {
getPartitions().clear();
updateCache();
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -390,12 +414,15 @@ public <T> T[] toArray(T[] a) {
@Override
public boolean add(RedisClusterNode redisClusterNode) {

synchronized (partitions) {
lock.lock();
try {
LettuceAssert.notNull(redisClusterNode, "RedisClusterNode must not be null");

boolean add = getPartitions().add(redisClusterNode);
updateCache();
return add;
} finally {
lock.unlock();
}
}

Expand All @@ -408,10 +435,13 @@ public boolean add(RedisClusterNode redisClusterNode) {
@Override
public boolean remove(Object o) {

synchronized (partitions) {
lock.lock();
try {
boolean remove = getPartitions().remove(o);
updateCache();
return remove;
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ public String getMessage() {
}

@Override
public synchronized Throwable fillInStackTrace() {
public Throwable fillInStackTrace() {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ public RedisCommand<Object, Object, Object> getCommand() {
* @return shared context.
*/
public Map<String, Object> getContext() {
synchronized (this) {
return context;
}
return context;
}

@Override
Expand Down
13 changes: 4 additions & 9 deletions src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.lettuce.core.event.Event;
import io.lettuce.core.internal.LettuceAssert;
Expand All @@ -23,7 +24,7 @@
*/
class JfrEventRecorder implements EventRecorder {

private final Map<Class<?>, Constructor<?>> constructorMap = new HashMap<>();
private final Map<Class<?>, Constructor<?>> constructorMap = new ConcurrentHashMap<>();

@Override
public void record(Event event) {
Expand Down Expand Up @@ -54,11 +55,7 @@ public RecordableEvent start(Event event) {

private Constructor<?> getEventConstructor(Event event) throws NoSuchMethodException {

Constructor<?> constructor;

synchronized (constructorMap) {
constructor = constructorMap.get(event.getClass());
}
Constructor<?> constructor = constructorMap.get(event.getClass());

if (constructor == null) {

Expand All @@ -73,9 +70,7 @@ private Constructor<?> getEventConstructor(Event event) throws NoSuchMethodExcep
constructor.setAccessible(true);
}

synchronized (constructorMap) {
constructorMap.put(event.getClass(), constructor);
}
constructorMap.put(event.getClass(), constructor);
}

return constructor;
Expand Down
Loading

0 comments on commit 1448b2b

Please sign in to comment.