From d01834560a8797c34af2fecdf09b9053dcdfd828 Mon Sep 17 00:00:00 2001 From: Francesco Capponi Date: Sat, 20 Feb 2021 20:49:30 -0500 Subject: [PATCH] Adding the ability to trigger task run manually. Motivation: in processing pipelines where tasks can be time-intensive, and multiple queues/tasks are fighting for resources, the lib users can benefit from implementing a more complex pulling strategy and trigger executions manually. Hence we start exposing a public executeDueTasks() method. Changes: - Exposing executeDueTasks as public - renaming Scheduler.start() to Scheduler.startConsumer(), keeping backward compatibility, to reflect that only the "consumer-machines" will have to start this process - add the ability to disable consuming with SchedulerBuilder.disableStartConsumingTasksOnStart(), in such a way that lifecycle threads can handle heartbeat and dead executions, but not computation heavy threads (of the executions), which can be triggered manually Additional: - feature tested - all methods changes are backward compatible. --- .../scheduler/DueExecutionsBatch.java | 44 +++++++++++--- .../kagkarlsson/scheduler/Scheduler.java | 48 +++++++++++++-- .../scheduler/SchedulerBuilder.java | 8 ++- .../scheduler/testhelper/ManualScheduler.java | 13 ++-- .../scheduler/testhelper/TestHelper.java | 4 +- .../kagkarlsson/scheduler/ClusterTest.java | 8 +-- .../scheduler/DeadExecutionsTest.java | 9 +-- .../scheduler/DueExecutionsBatchTest.java | 21 +++++-- .../kagkarlsson/scheduler/SchedulerTest.java | 60 +++++++++++++++---- .../compatibility/CompatibilityTest.java | 5 +- .../functional/DeadExecutionTest.java | 3 +- .../functional/ExecutorPoolTest.java | 2 +- .../functional/ImmediateExecutionTest.java | 4 +- .../functional/RecurringTaskTest.java | 16 ++--- 14 files changed, 179 insertions(+), 66 deletions(-) diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/DueExecutionsBatch.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/DueExecutionsBatch.java index cebea454..e3014339 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/DueExecutionsBatch.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/DueExecutionsBatch.java @@ -15,27 +15,39 @@ */ package com.github.kagkarlsson.scheduler; +import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; + class DueExecutionsBatch { private static final Logger LOG = LoggerFactory.getLogger(DueExecutionsBatch.class); private final int generationNumber; private final AtomicInteger executionsLeftInBatch; + private final CompletableFuture onBatchComplete; private int threadpoolSize; private boolean possiblyMoreExecutionsInDb; private boolean stale = false; private boolean triggeredExecuteDue; - public DueExecutionsBatch(int threadpoolSize, int generationNumber, int executionsAdded, boolean possiblyMoreExecutionsInDb) { + public DueExecutionsBatch(int threadpoolSize, int generationNumber, int executionsAdded, + boolean possiblyMoreExecutionsInDb) { + this(threadpoolSize, generationNumber, executionsAdded, possiblyMoreExecutionsInDb, new CompletableFuture<>()); + } + + public DueExecutionsBatch(int threadpoolSize, int generationNumber, int executionsAdded, + boolean possiblyMoreExecutionsInDb, CompletableFuture onBatchComplete) { this.threadpoolSize = threadpoolSize; this.generationNumber = generationNumber; this.possiblyMoreExecutionsInDb = possiblyMoreExecutionsInDb; this.executionsLeftInBatch = new AtomicInteger(executionsAdded); + this.onBatchComplete = onBatchComplete; + if (this.executionsLeftInBatch.get() == 0) { + this.onBatchComplete.complete(new BatchCompletionResult(false)); + } } public void markBatchAsStale() { @@ -50,16 +62,20 @@ public void oneExecutionDone(Runnable triggerCheckForNewBatch) { // May be called concurrently by multiple threads executionsLeftInBatch.decrementAndGet(); - LOG.trace("Batch state: generationNumber:{}, stale:{}, triggeredExecuteDue:{}, possiblyMoreExecutionsInDb:{}, executionsLeftInBatch:{}, ratio-trigger:{}", - generationNumber, stale, triggeredExecuteDue, possiblyMoreExecutionsInDb, executionsLeftInBatch.get(), (threadpoolSize * Scheduler.TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO)); - if (!stale - && !triggeredExecuteDue - && possiblyMoreExecutionsInDb - && executionsLeftInBatch.get() <= (threadpoolSize * Scheduler.TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO)) { + LOG.trace( + "Batch state: generationNumber:{}, stale:{}, triggeredExecuteDue:{}, possiblyMoreExecutionsInDb:{}, executionsLeftInBatch:{}, ratio-trigger:{}", + generationNumber, stale, triggeredExecuteDue, possiblyMoreExecutionsInDb, executionsLeftInBatch.get(), + (threadpoolSize * Scheduler.TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO)); + if (!stale && !triggeredExecuteDue && possiblyMoreExecutionsInDb && executionsLeftInBatch.get() <= ( + threadpoolSize * Scheduler.TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO)) { LOG.trace("Triggering check for new batch."); triggerCheckForNewBatch.run(); triggeredExecuteDue = true; } + + if (executionsLeftInBatch.get() == 0) { + onBatchComplete.complete(new BatchCompletionResult(possiblyMoreExecutionsInDb)); + } } public boolean isOlderGenerationThan(int compareTo) { @@ -69,4 +85,16 @@ public boolean isOlderGenerationThan(int compareTo) { public int getGenerationNumber() { return generationNumber; } + + public static class BatchCompletionResult { + private final boolean possiblyMoreExecutionsInDb; + + BatchCompletionResult(boolean possiblyMoreExecutionsInDb) { + this.possiblyMoreExecutionsInDb = possiblyMoreExecutionsInDb; + } + + public boolean possiblyMoreExecutionsInDb() { + return possiblyMoreExecutionsInDb; + } + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index ff579e8c..36402c38 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -22,6 +22,7 @@ import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.stats.StatsRegistry.SchedulerStatsEvent; import com.github.kagkarlsson.scheduler.task.*; +import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,7 @@ public class Scheduler implements SchedulerClient { private final Clock clock; private final TaskRepository schedulerTaskRepository; private final TaskResolver taskResolver; + private final boolean startConsumingTasksOnStart; private int threadpoolSize; private final ExecutorService executorService; private final Waiter executeDueWaiter; @@ -64,9 +66,10 @@ public class Scheduler implements SchedulerClient { private int currentGenerationNumber = 1; private final ConfigurableLogger failureLogger; - protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, ExecutorService executorService, SchedulerName schedulerName, - Waiter executeDueWaiter, Duration heartbeatInterval, boolean enableImmediateExecution, StatsRegistry statsRegistry, int pollingLimit, Duration deleteUnresolvedAfter, Duration shutdownMaxWait, - LogLevel logLevel, boolean logStackTrace, List onStartup) { + protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository, + TaskResolver taskResolver, int threadpoolSize, ExecutorService executorService, SchedulerName schedulerName, + Waiter executeDueWaiter, Duration heartbeatInterval, boolean enableImmediateExecution, StatsRegistry statsRegistry, int pollingLimit, Duration deleteUnresolvedAfter, Duration shutdownMaxWait, + LogLevel logLevel, boolean logStackTrace, List onStartup, boolean startConsumingTasksOnStart) { this.clock = clock; this.schedulerTaskRepository = schedulerTaskRepository; this.taskResolver = taskResolver; @@ -87,20 +90,36 @@ protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRep SchedulerClientEventListener earlyExecutionListener = (enableImmediateExecution ? new TriggerCheckForDueExecutions(schedulerState, clock, executeDueWaiter) : SchedulerClientEventListener.NOOP); delegate = new StandardSchedulerClient(clientTaskRepository, earlyExecutionListener); this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace); + this.startConsumingTasksOnStart = startConsumingTasksOnStart; } - public void start() { + /** + * Starts consuming the tasks from the queue. + * If the scheduler is not run with {@link #startConsumer()}, can still be used in `producer-only` mode. + */ + public void startConsumer() { LOG.info("Starting scheduler."); executeOnStartup(); - dueExecutor.submit(new RunUntilShutdown(this::executeDue, executeDueWaiter, schedulerState, statsRegistry)); + if(startConsumingTasksOnStart) { + dueExecutor.submit( + new RunUntilShutdown(this::executeDueTasks, executeDueWaiter, schedulerState, statsRegistry)); + } detectDeadExecutor.submit(new RunUntilShutdown(this::detectDeadExecutions, detectDeadWaiter, schedulerState, statsRegistry)); updateHeartbeatExecutor.submit(new RunUntilShutdown(this::updateHeartbeats, heartbeatWaiter, schedulerState, statsRegistry)); schedulerState.setStarted(); } + /** + * @deprecated use {@link #startConsumer()} + */ + @Deprecated + public void start() { + this.startConsumer(); + } + protected void executeOnStartup() { onStartup.forEach(os -> { try { @@ -214,18 +233,35 @@ public List getCurrentlyExecuting() { return new ArrayList<>(currentlyProcessing.values()); } + /** + * @deprecated Use {@link #executeDueTasks()} + */ + @Deprecated protected void executeDue() { + executeDueTasks(); + } + + public CompletableFuture executeDueTasks() { Instant now = clock.now(); List dueExecutions = schedulerTaskRepository.getDue(now, pollingLimit); LOG.trace("Found {} task instances due for execution", dueExecutions.size()); + boolean possiblyMoreExecutionsInDb = pollingLimit == dueExecutions.size(); + + CompletableFuture onBatchComplete = + new CompletableFuture<>(); + this.currentGenerationNumber = this.currentGenerationNumber + 1; - DueExecutionsBatch newDueBatch = new DueExecutionsBatch(Scheduler.this.threadpoolSize, currentGenerationNumber, dueExecutions.size(), pollingLimit == dueExecutions.size()); + DueExecutionsBatch newDueBatch = + new DueExecutionsBatch(Scheduler.this.threadpoolSize, currentGenerationNumber, dueExecutions.size(), + possiblyMoreExecutionsInDb, onBatchComplete); for (Execution e : dueExecutions) { executorService.execute(new PickAndExecute(e, newDueBatch)); } statsRegistry.register(SchedulerStatsEvent.RAN_EXECUTE_DUE); + + return onBatchComplete; } @SuppressWarnings({"rawtypes","unchecked"}) diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java index 351faab6..1d330076 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java @@ -68,6 +68,7 @@ public class SchedulerBuilder { protected boolean commitWhenAutocommitDisabled = false; protected LogLevel logLevel = DEFAULT_FAILURE_LOG_LEVEL; protected boolean logStackTrace = LOG_STACK_TRACE_ON_FAILURE; + private boolean startConsumingTasksOnStart = true; public SchedulerBuilder(DataSource dataSource, List> knownTasks) { this.dataSource = dataSource; @@ -148,6 +149,11 @@ public SchedulerBuilder enableImmediateExecution() { return this; } + public SchedulerBuilder disableStartConsumingTasksOnStart(){ + this.startConsumingTasksOnStart = false; + return this; + } + public SchedulerBuilder deleteUnresolvedAfter(Duration deleteAfter) { this.deleteUnresolvedAfter = deleteAfter; return this; @@ -205,6 +211,6 @@ public Scheduler build() { schedulerName.getName()); return new Scheduler(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, executorThreads, candidateExecutorService, schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, - deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks); + deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks, startConsumingTasksOnStart); } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java index 3bb28d5b..2e66330c 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java @@ -31,8 +31,13 @@ public class ManualScheduler extends Scheduler { private static final Logger LOG = LoggerFactory.getLogger(ManualScheduler.class); private final SettableClock clock; - ManualScheduler(SettableClock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository, TaskResolver taskResolver, int maxThreads, ExecutorService executorService, SchedulerName schedulerName, Waiter waiter, Duration heartbeatInterval, boolean executeImmediately, StatsRegistry statsRegistry, int pollingLimit, Duration deleteUnresolvedAfter, LogLevel logLevel, boolean logStackTrace, List onStartup) { - super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, maxThreads, executorService, schedulerName, waiter, heartbeatInterval, executeImmediately, statsRegistry, pollingLimit, deleteUnresolvedAfter, Duration.ZERO, logLevel, logStackTrace, onStartup); + ManualScheduler(SettableClock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository, + TaskResolver taskResolver, int maxThreads, ExecutorService executorService, SchedulerName schedulerName, Waiter waiter, Duration heartbeatInterval, boolean executeImmediately, StatsRegistry statsRegistry, + int pollingLimit, Duration deleteUnresolvedAfter, LogLevel logLevel, boolean logStackTrace, + List onStartup, boolean startConsumingTasksOnStart) { + + super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, maxThreads, executorService, schedulerName, waiter, heartbeatInterval, executeImmediately, statsRegistry, pollingLimit, deleteUnresolvedAfter, Duration.ZERO, logLevel, logStackTrace, onStartup, + startConsumingTasksOnStart); this.clock = clock; } @@ -49,7 +54,7 @@ public void setTime(Instant newtime) { } public void runAnyDueExecutions() { - super.executeDue(); + super.executeDueTasks(); } public void runDeadExecutionDetection() { @@ -57,7 +62,7 @@ public void runDeadExecutionDetection() { } - public void start() { + public void startConsumer() { LOG.info("Starting manual scheduler. Executing on-startup tasks."); executeOnStartup(); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java index 997e4cdc..8986fb7a 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java @@ -70,12 +70,12 @@ public ManualScheduler build() { final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer); final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer); - return new ManualScheduler(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, executorThreads, new DirectExecutorService(), schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, deleteUnresolvedAfter, LogLevel.DEBUG, true, startTasks); + return new ManualScheduler(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, executorThreads, new DirectExecutorService(), schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, deleteUnresolvedAfter, LogLevel.DEBUG, true, startTasks, true); } public ManualScheduler start() { ManualScheduler scheduler = build(); - scheduler.start(); + scheduler.startConsumer(); return scheduler; } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java index 0e1915c5..d5dce670 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java @@ -52,8 +52,8 @@ public void test_concurrency() throws InterruptedException { final Scheduler scheduler2 = createScheduler("scheduler2", task, stats); stopScheduler.register(scheduler1, scheduler2); - scheduler1.start(); - scheduler2.start(); + scheduler1.startConsumer(); + scheduler2.startConsumer(); ids.forEach(id -> { scheduler1.schedule(task.instance(id), Instant.now()); @@ -88,8 +88,8 @@ public void test_concurrency_recurring() throws InterruptedException { final Scheduler scheduler2 = createSchedulerRecurring("scheduler2", task1, stats); stopScheduler.register(scheduler1, scheduler2); - scheduler1.start(); - scheduler2.start(); + scheduler1.startConsumer(); + scheduler2.startConsumer(); Thread.sleep(5_000); scheduler1.stop(); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java index c7d78c34..d018cd20 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java @@ -75,7 +75,8 @@ public void setUp() { Duration.ZERO, LogLevel.DEBUG, true, - new ArrayList<>()); + new ArrayList<>(), + true); } @@ -113,10 +114,10 @@ public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat( final Execution execution1 = new Execution(oneHourAgo, taskInstance); jdbcTaskRepository.createIfNotExists(execution1); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(1)); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(1)); settableClock.set(Instant.now()); @@ -126,7 +127,7 @@ public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat( settableClock.set(Instant.now()); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(2)); } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DueExecutionsBatchTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DueExecutionsBatchTest.java index f8baa3d1..caac8505 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DueExecutionsBatchTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DueExecutionsBatchTest.java @@ -1,5 +1,7 @@ package com.github.kagkarlsson.scheduler; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.Test; import java.util.concurrent.atomic.AtomicInteger; @@ -16,7 +18,7 @@ public class DueExecutionsBatchTest { @Test - public void test_trigger_check_for_more_due() { + public void test_trigger_check_for_more_due() throws ExecutionException, InterruptedException { assertTrigger(0, 0, newBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB)); assertTrigger(0, 10, newBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB)); assertTrigger(0, 14, newBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB)); @@ -27,7 +29,8 @@ public void test_trigger_check_for_more_due() { assertTrigger(0, HAPPY_NUMBER_ADDED_LAST_TIME, newBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, false)); // Stale batch, Scheduler has already triggered a new executeDue - assertTrigger(0, HAPPY_NUMBER_ADDED_LAST_TIME, staleBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB)); + assertTrigger(0, HAPPY_NUMBER_ADDED_LAST_TIME, + staleBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB)); } private void assertTrigger(int timesTriggered, int afterExeutionsHandled, DueExecutionsBatch batch) { @@ -36,15 +39,21 @@ private void assertTrigger(int timesTriggered, int afterExeutionsHandled, DueExe assertEquals(timesTriggered, triggered.get()); } - private DueExecutionsBatch newBatch(int schedulerThreadpoolSize, int numberAddedFromLastDbQuery, boolean likelyMoreDueInDb) { - DueExecutionsBatch batch = new DueExecutionsBatch(schedulerThreadpoolSize, HAPPY_GENERATION_NUMBER, numberAddedFromLastDbQuery, likelyMoreDueInDb); + private DueExecutionsBatch newBatch(int schedulerThreadpoolSize, int numberAddedFromLastDbQuery, + boolean likelyMoreDueInDb) throws ExecutionException, InterruptedException { + + CompletableFuture onBatchComplete = new CompletableFuture<>(); + DueExecutionsBatch batch = + new DueExecutionsBatch(schedulerThreadpoolSize, HAPPY_GENERATION_NUMBER, numberAddedFromLastDbQuery, + likelyMoreDueInDb, onBatchComplete); + assertEquals(onBatchComplete.get().possiblyMoreExecutionsInDb(), likelyMoreDueInDb); return batch; } - private DueExecutionsBatch staleBatch(int schedulerThreadpoolSize, int numberAddedFromLastDbQuery, boolean likelyMoreDueInDb) { + private DueExecutionsBatch staleBatch(int schedulerThreadpoolSize, int numberAddedFromLastDbQuery, + boolean likelyMoreDueInDb) throws ExecutionException, InterruptedException { DueExecutionsBatch batch = newBatch(schedulerThreadpoolSize, numberAddedFromLastDbQuery, likelyMoreDueInDb); batch.markBatchAsStale(); return batch; } - } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java index 4fc05bdd..d2e16144 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java @@ -2,6 +2,7 @@ import com.github.kagkarlsson.scheduler.logging.LogLevel; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.ExecutionComplete; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; @@ -11,6 +12,11 @@ import com.github.kagkarlsson.scheduler.task.schedule.FixedDelay; import com.github.kagkarlsson.scheduler.testhelper.SettableClock; import com.google.common.util.concurrent.MoreExecutors; +import java.time.temporal.ChronoUnit; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -29,6 +35,7 @@ public class SchedulerTest { + public static final int POLLING_LIMIT = 10; private TestTasks.CountingHandler handler; private SettableClock clock; @@ -48,8 +55,12 @@ private Scheduler schedulerFor(Task... tasks) { private Scheduler schedulerFor(ExecutorService executor, Task ... tasks) { final StatsRegistry statsRegistry = StatsRegistry.NOOP; TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, Arrays.asList(tasks)); - JdbcTaskRepository taskRepository = new JdbcTaskRepository(postgres.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1")); - return new Scheduler(clock, taskRepository, taskRepository, taskResolver, 1, executor, new SchedulerName.Fixed("name"), new Waiter(Duration.ZERO), Duration.ofSeconds(1), false, statsRegistry, 10_000, Duration.ofDays(14), Duration.ZERO, LogLevel.DEBUG, true, new ArrayList<>()); + JdbcTaskRepository taskRepository = + new JdbcTaskRepository(postgres.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolver, + new SchedulerName.Fixed("scheduler1")); + return new Scheduler(clock, taskRepository, taskRepository, taskResolver, 1, executor, + new SchedulerName.Fixed("name"), new Waiter(Duration.ZERO), Duration.ofSeconds(1), false, statsRegistry, + POLLING_LIMIT, Duration.ofDays(14), Duration.ZERO, LogLevel.DEBUG, true, new ArrayList<>(), true); } @Test @@ -60,11 +71,11 @@ public void scheduler_should_execute_task_when_exactly_due() { Instant executionTime = clock.now().plus(Duration.ofMinutes(1)); scheduler.schedule(oneTimeTask.instance("1"), executionTime); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(handler.timesExecuted.get(), is(0)); clock.set(executionTime); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(handler.timesExecuted.get(), is(1)); } @@ -79,15 +90,15 @@ public void scheduler_should_execute_rescheduled_task_when_exactly_due() { scheduler.schedule(oneTimeTaskInstance, executionTime); Instant reScheduledExecutionTime = clock.now().plus(Duration.ofMinutes(2)); scheduler.reschedule(oneTimeTaskInstance, reScheduledExecutionTime); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(handler.timesExecuted.get(), is(0)); clock.set(executionTime); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(handler.timesExecuted.get(), is(0)); clock.set(reScheduledExecutionTime); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(handler.timesExecuted.get(), is(1)); } @@ -101,11 +112,11 @@ public void scheduler_should_not_execute_canceled_tasks() { TaskInstance oneTimeTaskInstance = oneTimeTask.instance(instanceId); scheduler.schedule(oneTimeTaskInstance, executionTime); scheduler.cancel(oneTimeTaskInstance); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(handler.timesExecuted.get(), is(0)); clock.set(executionTime); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(handler.timesExecuted.get(), is(0)); } @@ -115,13 +126,13 @@ public void scheduler_should_execute_recurring_task_and_reschedule() { Scheduler scheduler = schedulerFor(recurringTask); scheduler.schedule(recurringTask.instance("single"), clock.now()); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(handler.timesExecuted.get(), is(1)); Instant nextExecutionTime = clock.now().plus(Duration.ofHours(1)); clock.set(nextExecutionTime); - scheduler.executeDue(); + scheduler.executeDueTasks(); assertThat(handler.timesExecuted.get(), is(2)); } @@ -132,7 +143,7 @@ public void scheduler_should_track_duration() throws InterruptedException { Scheduler scheduler = schedulerFor(Executors.newSingleThreadExecutor(), oneTimeTask); scheduler.schedule(oneTimeTask.instance("1"), clock.now()); - scheduler.executeDue(); + scheduler.executeDueTasks(); pausingHandler.waitForExecute.await(); assertThat(scheduler.getCurrentlyExecuting(), hasSize(1)); @@ -157,7 +168,30 @@ public void should_expose_cause_of_failure_to_completion_handler() throws Interr assertThat(failureHandler.result, is(ExecutionComplete.Result.FAILED)); assertThat(failureHandler.cause.get().getMessage(), is("Failed!")); - } + @Test + public void scheduler_should_return_if_still_tasks_due() throws ExecutionException, InterruptedException { + OneTimeTask oneTimeTask = TestTasks.oneTime("OneTime", Void.class, handler); + Scheduler scheduler = schedulerFor(oneTimeTask); + + // Since no tasks have been inserted, there should be no extra executions + assertThat(scheduler.executeDueTasks().get().possiblyMoreExecutionsInDb(), is(false)); + + // Inserting one task in the future --> no tasks to execute + scheduler.schedule(oneTimeTask.instance("in future"), clock.now().plus(Duration.ofMinutes(1))); + assertThat(scheduler.executeDueTasks().get().possiblyMoreExecutionsInDb(), is(false)); + + // Inserting exactly POLLING_LIMIT -1, therefore we know that there are no more tasks + IntStream.range(0, POLLING_LIMIT - 1) + .forEach(i -> scheduler.schedule(oneTimeTask.instance("-1 of limit" + i), clock.now())); + assertThat(scheduler.executeDueTasks().get().possiblyMoreExecutionsInDb(), is(false)); + + // Inserting exactly POLLING_LIMIT, therefore we DON'T know if there are any more tasks + IntStream.range(0, POLLING_LIMIT) + .forEach(i -> scheduler.schedule(oneTimeTask.instance("exact-limit" + i), clock.now())); + assertThat(scheduler.executeDueTasks().get().possiblyMoreExecutionsInDb(), is(true)); + // once we run all of them, we there should be 0 tasks in it, and therefore we know we ran them all + assertThat(scheduler.executeDueTasks().get().possiblyMoreExecutionsInDb(), is(false)); + } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java index b7cc4dd2..6c9733e4 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java @@ -20,10 +20,8 @@ import com.google.common.collect.Lists; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.LoggerFactory; import javax.sql.DataSource; import java.time.Duration; @@ -33,7 +31,6 @@ import java.util.Optional; import static com.github.kagkarlsson.scheduler.JdbcTaskRepository.DEFAULT_TABLE_NAME; -import static java.time.temporal.ChronoUnit.MILLIS; import static java.util.Collections.singletonList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; @@ -105,7 +102,7 @@ public void test_compatibility() { scheduler.schedule(recurring.instance("id3"), Instant.now()); scheduler.schedule(recurring.instance("id4"), Instant.now()); - scheduler.start(); + scheduler.startConsumer(); completed12Condition.waitFor(); scheduler.stop(); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeadExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeadExecutionTest.java index 608d8c0a..d2914702 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeadExecutionTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeadExecutionTest.java @@ -1,6 +1,5 @@ package com.github.kagkarlsson.scheduler.functional; -import com.github.kagkarlsson.scheduler.DbUtils; import com.github.kagkarlsson.scheduler.EmbeddedPostgresqlExtension; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.SchedulerName; @@ -52,7 +51,7 @@ public void complete(ExecutionComplete executionComplete, ExecutionOperations scheduler.schedule(task.instance(String.valueOf(i)), clock.now())); Assertions.assertTimeoutPreemptively(Duration.ofSeconds(5), () -> { - scheduler.start(); + scheduler.startConsumer(); condition.waitFor(); List completed = registry.getCompleted(); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ImmediateExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ImmediateExecutionTest.java index d6c79586..611d5281 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ImmediateExecutionTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ImmediateExecutionTest.java @@ -1,7 +1,6 @@ package com.github.kagkarlsson.scheduler.functional; import co.unruly.matchers.TimeMatchers; -import com.github.kagkarlsson.scheduler.DbUtils; import com.github.kagkarlsson.scheduler.EmbeddedPostgresqlExtension; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.SchedulerName; @@ -14,7 +13,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import java.time.Duration; @@ -59,7 +57,7 @@ public void test_immediate_execution() { .build(); stopScheduler.register(scheduler); - scheduler.start(); + scheduler.startConsumer(); executeDueCondition.waitFor(); scheduler.schedule(task.instance("1"), clock.now()); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/RecurringTaskTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/RecurringTaskTest.java index 50eaec33..ae574048 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/RecurringTaskTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/RecurringTaskTest.java @@ -51,7 +51,7 @@ public void should_have_starttime_according_to_schedule_by_default() { .execute(TestTasks.DO_NOTHING); ManualScheduler scheduler = manualSchedulerFor(singletonList(recurringTask)); - scheduler.start(); + scheduler.startConsumer(); assertScheduled(scheduler, RECURRING_A, LocalTime.of(23, 59)); } @@ -63,7 +63,7 @@ public void should_have_starttime_now_if_overridden_by_schedule() { .execute(TestTasks.DO_NOTHING); ManualScheduler scheduler = manualSchedulerFor(singletonList(recurringTask)); - scheduler.start(); + scheduler.startConsumer(); assertScheduled(scheduler, RECURRING_A, TIME); } @@ -75,7 +75,7 @@ public void should_update_preexisting_exeutions_with_new_schedule_if_new_next_ex ManualScheduler scheduler = manualSchedulerFor(singletonList(recurringTask)); - scheduler.start(); + scheduler.startConsumer(); assertScheduled(scheduler, RECURRING_A, LocalTime.of(23, 59)); // Add an additional execution-time to the daily schedule @@ -88,7 +88,7 @@ public void should_update_preexisting_exeutions_with_new_schedule_if_new_next_ex ManualScheduler schedulerUpdatedTask = manualSchedulerFor(singletonList(recurringTaskNewSchedule)); // Simulate restart with updated schedule for task, execution now already exists - schedulerUpdatedTask.start(); + schedulerUpdatedTask.startConsumer(); assertScheduled(schedulerUpdatedTask, RECURRING_A, LocalTime.of(12, 0)); } @@ -100,7 +100,7 @@ public void should_update_preexisting_exeutions_with_new_deterministic_schedule_ ManualScheduler scheduler = manualSchedulerFor(singletonList(recurringTask)); - scheduler.start(); + scheduler.startConsumer(); assertScheduled(scheduler, RECURRING_A, LocalTime.of(12, 0)); RecurringTask recurringTaskNewSchedule = Tasks.recurring(RECURRING_A, @@ -110,7 +110,7 @@ public void should_update_preexisting_exeutions_with_new_deterministic_schedule_ ManualScheduler schedulerUpdatedTask = manualSchedulerFor(singletonList(recurringTaskNewSchedule)); // Simulate restart with updated schedule for task, execution now already exists - schedulerUpdatedTask.start(); + schedulerUpdatedTask.startConsumer(); // Should have unchanged execution-time assertScheduled(schedulerUpdatedTask, RECURRING_A, LocalTime.of(23, 59)); @@ -124,7 +124,7 @@ public void should_not_update_data_of_preexisting_exeutions_even_if_rescheduling ManualScheduler scheduler = manualSchedulerFor(singletonList(recurringTask)); - scheduler.start(); + scheduler.startConsumer(); assertScheduled(scheduler, RECURRING_A, LocalTime.of(23, 59), 1); // Add an additional execution-time to the daily schedule @@ -138,7 +138,7 @@ public void should_not_update_data_of_preexisting_exeutions_even_if_rescheduling ManualScheduler schedulerUpdatedTask = manualSchedulerFor(singletonList(recurringTaskNewSchedule)); // Simulate restart with updated schedule for task, execution now already exists - schedulerUpdatedTask.start(); + schedulerUpdatedTask.startConsumer(); assertScheduled(schedulerUpdatedTask, RECURRING_A, LocalTime.of(12, 0), 1); // unchanged data }