diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 247069410a..439049bc34 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -10,10 +10,16 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.PooledObjectFactory; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.util.IOUtils; public abstract class MultiNodePipelineBase extends PipelineBase { @@ -27,6 +33,41 @@ public abstract class MultiNodePipelineBase extends PipelineBase { */ public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3; + // Create the object pool + private static GenericObjectPool servicePool = new GenericObjectPool(new ExecutorServiceFactory(), new GenericObjectPoolConfig<>()); + + // Create a factory for creating and managing ExecutorService objects + private static class ExecutorServiceFactory implements PooledObjectFactory + { + + @Override + public PooledObject makeObject() throws Exception { + return new DefaultPooledObject<>( + Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS)); + } + + @Override + public void destroyObject(PooledObject p) throws Exception { + p.getObject().shutdown(); + } + + @Override + public boolean validateObject(PooledObject p) { + return true; // Always valid + } + + @Override + public void activateObject(PooledObject p) throws Exception { + // No activation needed + } + + @Override + public void passivateObject(PooledObject p) throws Exception { + // No passivation needed + } + } + + private final Map>> pipelinedResponses; private final Map connections; private volatile boolean syncing = false; @@ -85,42 +126,48 @@ public final void sync() { } syncing = true; - ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); - - CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); - Iterator>>> pipelinedResponsesIterator - = pipelinedResponses.entrySet().iterator(); - while (pipelinedResponsesIterator.hasNext()) { - Map.Entry>> entry = pipelinedResponsesIterator.next(); - HostAndPort nodeKey = entry.getKey(); - Queue> queue = entry.getValue(); - Connection connection = connections.get(nodeKey); - executorService.submit(() -> { - try { - List unformatted = connection.getMany(queue.size()); - for (Object o : unformatted) { - queue.poll().set(o); - } - } catch (JedisConnectionException jce) { - log.error("Error with connection to " + nodeKey, jce); - // cleanup the connection - pipelinedResponsesIterator.remove(); - connections.remove(nodeKey); - IOUtils.closeQuietly(connection); - } finally { - countDownLatch.countDown(); - } - }); - } - + ExecutorService executorService = null; try { - countDownLatch.await(); - } catch (InterruptedException e) { - log.error("Thread is interrupted during sync.", e); + executorService = servicePool.borrowObject(); + } catch (Exception e) { + throw new JedisException("Error with borrowing executor service from pool", e); } - executorService.shutdownNow(); + try { + CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); + Iterator>>> pipelinedResponsesIterator + = pipelinedResponses.entrySet().iterator(); + while (pipelinedResponsesIterator.hasNext()) { + Map.Entry>> entry = pipelinedResponsesIterator.next(); + HostAndPort nodeKey = entry.getKey(); + Queue> queue = entry.getValue(); + Connection connection = connections.get(nodeKey); + executorService.submit(() -> { + try { + List unformatted = connection.getMany(queue.size()); + for (Object o : unformatted) { + queue.poll().set(o); + } + } catch (JedisConnectionException jce) { + log.error("Error with connection to " + nodeKey, jce); + // cleanup the connection + pipelinedResponsesIterator.remove(); + connections.remove(nodeKey); + IOUtils.closeQuietly(connection); + } finally { + countDownLatch.countDown(); + } + }); + } + try { + countDownLatch.await(); + } catch (InterruptedException e) { + log.error("Thread is interrupted during sync.", e); + } + } finally { + servicePool.returnObject(executorService); + } syncing = false; }