Skip to content

ExecuterService pool for MultiNodePipelineBase #4092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 79 additions & 32 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.IOUtils;

public abstract class MultiNodePipelineBase extends PipelineBase {
Expand All @@ -27,6 +33,41 @@ public abstract class MultiNodePipelineBase extends PipelineBase {
*/
public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3;

// Create the object pool
private static GenericObjectPool<ExecutorService> servicePool = new GenericObjectPool<ExecutorService>(new ExecutorServiceFactory(), new GenericObjectPoolConfig<>());

// Create a factory for creating and managing ExecutorService objects
private static class ExecutorServiceFactory implements PooledObjectFactory<ExecutorService>
{

@Override
public PooledObject<ExecutorService> makeObject() throws Exception {
return new DefaultPooledObject<>(
Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS));
}

@Override
public void destroyObject(PooledObject<ExecutorService> p) throws Exception {
p.getObject().shutdown();
}

@Override
public boolean validateObject(PooledObject<ExecutorService> p) {
return true; // Always valid
}

@Override
public void activateObject(PooledObject<ExecutorService> p) throws Exception {
// No activation needed
}

@Override
public void passivateObject(PooledObject<ExecutorService> p) throws Exception {
// No passivation needed
}
}


private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
private final Map<HostAndPort, Connection> connections;
private volatile boolean syncing = false;
Expand Down Expand Up @@ -85,42 +126,48 @@ public final void sync() {
}
syncing = true;

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
while (pipelinedResponsesIterator.hasNext()) {
Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
HostAndPort nodeKey = entry.getKey();
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
executorService.submit(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
}
});
}

ExecutorService executorService = null;
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
executorService = servicePool.borrowObject();
} catch (Exception e) {
throw new JedisException("Error with borrowing executor service from pool", e);
}

executorService.shutdownNow();
try {
CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
while (pipelinedResponsesIterator.hasNext()) {
Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
HostAndPort nodeKey = entry.getKey();
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
executorService.submit(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
}
});
}

try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
}
} finally {
servicePool.returnObject(executorService);
}
syncing = false;
}

Expand Down
Loading