diff --git a/src/main/java/redis/clients/jedis/JedisThreadFactoryBuilder.java b/src/main/java/redis/clients/jedis/JedisThreadFactoryBuilder.java new file mode 100644 index 0000000000..bb6af88383 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisThreadFactoryBuilder.java @@ -0,0 +1,140 @@ +package redis.clients.jedis; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + +/** + * JedisThreadFactoryBuilder is a class that builds a ThreadFactory for Jedis. + */ +public class JedisThreadFactoryBuilder { + private String namePrefix = null; + private boolean daemon = false; + private int priority = Thread.NORM_PRIORITY; + private ThreadFactory backingThreadFactory = null; + private UncaughtExceptionHandler uncaughtExceptionHandler = null; + + /** + * Sets the name prefix for the threads created by the ThreadFactory. + * + * @param namePrefix the name prefix for the threads + * @return the JedisThreadFactoryBuilder instance + * @throws NullPointerException if namePrefix is null + */ + public JedisThreadFactoryBuilder setNamePrefix(String namePrefix) { + if (namePrefix == null) { + throw new NullPointerException(); + } + this.namePrefix = namePrefix; + return this; + } + + /** + * Sets whether the threads created by the ThreadFactory are daemon threads. + * + * @param daemon true if the threads are daemon threads, false otherwise + * @return the JedisThreadFactoryBuilder instance + */ + public JedisThreadFactoryBuilder setDaemon(boolean daemon) { + this.daemon = daemon; + return this; + } + + /** + * Sets the priority for the threads created by the ThreadFactory. + * + * @param priority the priority for the threads + * @return the JedisThreadFactoryBuilder instance + * @throws IllegalArgumentException if priority is not in the range of Thread.MIN_PRIORITY to Thread.MAX_PRIORITY + */ + public JedisThreadFactoryBuilder setPriority(int priority) { + if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { + throw new IllegalArgumentException(String.format( + "Thread priority (%s) must be in %d ~ %d", priority, + Thread.MIN_PRIORITY, Thread.MAX_PRIORITY)); + } + + this.priority = priority; + return this; + } + + /** + * Sets the UncaughtExceptionHandler for the threads created by the ThreadFactory. + * + * @param uncaughtExceptionHandler the UncaughtExceptionHandler for the threads + * @return the JedisThreadFactoryBuilder instance + * @throws NullPointerException if uncaughtExceptionHandler is null + */ + public JedisThreadFactoryBuilder setUncaughtExceptionHandler( + UncaughtExceptionHandler uncaughtExceptionHandler) { + if (uncaughtExceptionHandler == null) { + throw new NullPointerException( + "UncaughtExceptionHandler cannot be null"); + } + this.uncaughtExceptionHandler = uncaughtExceptionHandler; + return this; + } + + /** + * Sets the backing ThreadFactory for the JedisThreadFactoryBuilder. + * + * @param backingThreadFactory the backing ThreadFactory + * @return the JedisThreadFactoryBuilder instance + * @throws NullPointerException if backingThreadFactory is null + */ + public JedisThreadFactoryBuilder setThreadFactory( + ThreadFactory backingThreadFactory) { + if (uncaughtExceptionHandler == null) { + throw new NullPointerException( + "BackingThreadFactory cannot be null"); + } + this.backingThreadFactory = backingThreadFactory; + return this; + } + + /** + * Builds a ThreadFactory using the JedisThreadFactoryBuilder instance. + * + * @return the ThreadFactory + */ + public ThreadFactory build() { + return build(this); + } + + /** + * Builds a ThreadFactory by JedisThreadFactoryBuilder. + * + * @param builder JedisThreadFactoryBuilder + * @return ThreadFactory + */ + private static ThreadFactory build(JedisThreadFactoryBuilder builder) { + final String namePrefix = builder.namePrefix; + final Boolean daemon = builder.daemon; + final Integer priority = builder.priority; + final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler; + final ThreadFactory backingThreadFactory = (builder.backingThreadFactory != null) ? builder.backingThreadFactory + : Executors.defaultThreadFactory(); + final AtomicLong count = new AtomicLong(0); + + return new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + Thread thread = backingThreadFactory.newThread(runnable); + if (daemon) { + thread.setDaemon(daemon); + } + if (priority != Thread.NORM_PRIORITY) { + thread.setPriority(priority); + } + if (namePrefix != null) { + thread.setName(namePrefix + "-" + count.getAndIncrement()); + } + if (uncaughtExceptionHandler != null) { + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); + } + return thread; + } + }; + } +} diff --git a/src/main/java/redis/clients/jedis/JedisThreadPoolBuilder.java b/src/main/java/redis/clients/jedis/JedisThreadPoolBuilder.java new file mode 100644 index 0000000000..dba19d2583 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisThreadPoolBuilder.java @@ -0,0 +1,120 @@ +package redis.clients.jedis; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to build a thread pool for Jedis. + */ +public class JedisThreadPoolBuilder { + private static final Logger log = LoggerFactory.getLogger(JedisThreadPoolBuilder.class); + + private static final RejectedExecutionHandler defaultRejectHandler = new AbortPolicy(); + + public static PoolBuilder pool() { + return new PoolBuilder(); + } + + /** + * Custom thread factory or use default + * @param threadNamePrefix the thread name prefix + * @param daemon daemon + * @return ThreadFactory + */ + private static ThreadFactory createThreadFactory(String threadNamePrefix, boolean daemon) { + if (threadNamePrefix != null) { + return new JedisThreadFactoryBuilder().setNamePrefix(threadNamePrefix).setDaemon(daemon) + .setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + log.error(String.format("Thread %s threw exception %s", t.getName(), e.getMessage())); + } + }).build(); + } + + return Executors.defaultThreadFactory(); + } + + /** + * This class is used to build a thread pool. + */ + public static class PoolBuilder { + private int coreSize = 0; + private int maxSize = Integer.MAX_VALUE; + private long keepAliveMillSecs = 10; + private ThreadFactory threadFactory; + private String threadNamePrefix; + private boolean daemon; + private RejectedExecutionHandler rejectHandler; + private BlockingQueue workQueue; + + public PoolBuilder setCoreSize(int coreSize) { + this.coreSize = coreSize; + return this; + } + + public PoolBuilder setMaxSize(int maxSize) { + this.maxSize = maxSize; + return this; + } + + public PoolBuilder setKeepAliveMillSecs(long keepAliveMillSecs) { + this.keepAliveMillSecs = keepAliveMillSecs; + return this; + } + + public PoolBuilder setThreadNamePrefix(String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + return this; + } + + public PoolBuilder setDaemon(boolean daemon) { + this.daemon = daemon; + return this; + } + + public PoolBuilder setThreadFactory(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + return this; + } + + public PoolBuilder setRejectHandler(RejectedExecutionHandler rejectHandler) { + this.rejectHandler = rejectHandler; + return this; + } + + public PoolBuilder setWorkQueue(BlockingQueue workQueue) { + this.workQueue = workQueue; + return this; + } + + public ExecutorService build() { + if (threadFactory == null) { + threadFactory = createThreadFactory(threadNamePrefix, daemon); + } + + if (workQueue == null) { + throw new IllegalArgumentException("workQueue can't be null"); + } + + if (rejectHandler == null) { + rejectHandler = defaultRejectHandler; + } + + ExecutorService executorService = new ThreadPoolExecutor(coreSize, maxSize, keepAliveMillSecs, + TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectHandler); + + return executorService; + } + } +} diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index eef6b2a810..0bf4d92828 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -7,9 +7,10 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,18 +26,26 @@ public abstract class MultiNodePipelineBase extends PipelineBase implements PipelineCommands, PipelineBinaryCommands, RedisModulePipelineCommands, Closeable { - private final Logger log = LoggerFactory.getLogger(getClass()); - - /** - * The number of processes for {@code sync()}. If you have enough cores for client (and you have - * more than 3 cluster nodes), you may increase this number of workers. - * Suggestion: ≤ cluster nodes. - */ - public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3; + private static final Logger log = LoggerFactory.getLogger(MultiNodePipelineBase.class); private final Map>> pipelinedResponses; private final Map connections; private volatile boolean syncing = false; + /** + * The following are the default parameters for the multi node pipeline executor + * Since Redis query is usually a slower IO operation (requires more threads), + * so we set DEFAULT_CORE_POOL_SIZE to be the same as the core + */ + private static final long DEFAULT_KEEPALIVE_TIME_MS = 60000L; + private static final int DEFAULT_BLOCKING_QUEUE_SIZE = Protocol.CLUSTER_HASHSLOTS; + private static final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors(); + private static final int DEFAULT_MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; + private static ExecutorService executorService = JedisThreadPoolBuilder.pool() + .setCoreSize(DEFAULT_CORE_POOL_SIZE) + .setMaxSize(DEFAULT_MAXIMUM_POOL_SIZE) + .setKeepAliveMillSecs(DEFAULT_KEEPALIVE_TIME_MS) + .setThreadNamePrefix("jedis-multi-node-pipeline") + .setWorkQueue(new ArrayBlockingQueue<>(DEFAULT_BLOCKING_QUEUE_SIZE)).build(); public MultiNodePipelineBase(CommandObjects commandObjects) { super(commandObjects); @@ -44,6 +53,17 @@ public MultiNodePipelineBase(CommandObjects commandObjects) { connections = new LinkedHashMap<>(); } + /** + * Provide an interface for users to set executors themselves. + * @param executor the executor + */ + public static void setExecutorService(ExecutorService executor) { + if (executorService != executor && executorService != null) { + executorService.shutdown(); + } + executorService = executor; + } + /** * Sub-classes must call this method, if graph commands are going to be used. * @param connectionProvider connection provider @@ -102,8 +122,6 @@ public final void sync() { } syncing = true; - ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); - CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); Iterator>>> pipelinedResponsesIterator = pipelinedResponses.entrySet().iterator(); @@ -112,22 +130,28 @@ public final void sync() { HostAndPort nodeKey = entry.getKey(); Queue> queue = entry.getValue(); Connection connection = connections.get(nodeKey); - executorService.submit(() -> { - try { - List unformatted = connection.getMany(queue.size()); - for (Object o : unformatted) { - queue.poll().set(o); + try { + executorService.submit(() -> { + try { + List unformatted = connection.getMany(queue.size()); + for (Object o : unformatted) { + queue.poll().set(o); + } + } catch (JedisConnectionException jce) { + log.error("Error with connection to " + nodeKey, jce); + // cleanup the connection + pipelinedResponsesIterator.remove(); + connections.remove(nodeKey); + IOUtils.closeQuietly(connection); + } finally { + countDownLatch.countDown(); } - } catch (JedisConnectionException jce) { - log.error("Error with connection to " + nodeKey, jce); - // cleanup the connection - pipelinedResponsesIterator.remove(); - connections.remove(nodeKey); - IOUtils.closeQuietly(connection); - } finally { - countDownLatch.countDown(); - } - }); + }); + } catch (RejectedExecutionException e) { + log.error("Get a reject exception when submitting, it is recommended that you use the " + + "MultiNodePipelineBase#setExecutorService method to customize the executor", e); + throw e; + } } try { @@ -135,9 +159,6 @@ public final void sync() { } catch (InterruptedException e) { log.error("Thread is interrupted during sync.", e); } - - executorService.shutdownNow(); - syncing = false; } diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 3ed45bae63..060667eb30 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -4,6 +4,12 @@ import static redis.clients.jedis.Protocol.CLUSTER_HASHSLOTS; import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -1097,4 +1103,30 @@ private static void assertThreadsCount() { .count(); MatcherAssert.assertThat(count, Matchers.lessThanOrEqualTo(20)); } + + @Test + public void clusterPipelineCustomExecutorService() { + ExecutorService myService = Executors.newFixedThreadPool(1); + MultiNodePipelineBase.setExecutorService(myService); + + try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG); + ClusterPipeline pipeline = cluster.pipelined()) { + + Response r1 = pipeline.set("key1", "value1"); + Response r2 = pipeline.set("key2", "value2"); + Response r3 = pipeline.set("key3", "value3"); + Response r4 = pipeline.get("key1"); + Response r5 = pipeline.get("key2"); + Response r6 = pipeline.get("key3"); + + pipeline.sync(); + + assertEquals("OK", r1.get()); + assertEquals("OK", r2.get()); + assertEquals("OK", r3.get()); + assertEquals("value1", r4.get()); + assertEquals("value2", r5.get()); + assertEquals("value3", r6.get()); + } + } }