diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java new file mode 100644 index 00000000..5b8eb535 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java @@ -0,0 +1,128 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler; + +import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; +import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.task.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +@SuppressWarnings("rawtypes") +class AsyncExecutePicked { + private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutePicked.class); + private final Executor executor; + private final TaskRepository taskRepository; + private SchedulerClientEventListener earlyExecutionListener; + private final SchedulerClient schedulerClient; + private final StatsRegistry statsRegistry; + private final TaskResolver taskResolver; + private final SchedulerState schedulerState; + private final ConfigurableLogger failureLogger; + private final Clock clock; + private final Execution pickedExecution; + + public AsyncExecutePicked(Executor executor, TaskRepository taskRepository, SchedulerClientEventListener earlyExecutionListener, SchedulerClient schedulerClient, StatsRegistry statsRegistry, + TaskResolver taskResolver, SchedulerState schedulerState, ConfigurableLogger failureLogger, + Clock clock, Execution pickedExecution) { + this.executor = executor; + this.taskRepository = taskRepository; + this.earlyExecutionListener = earlyExecutionListener; + this.schedulerClient = schedulerClient; + this.statsRegistry = statsRegistry; + this.taskResolver = taskResolver; + this.schedulerState = schedulerState; + this.failureLogger = failureLogger; + this.clock = clock; + this.pickedExecution = pickedExecution; + } + + public CompletableFuture toCompletableFuture() { + // FIXLATER: need to cleanup all the references back to scheduler fields + final UUID executionId = executor.addCurrentlyProcessing(new CurrentlyExecuting(pickedExecution, clock)); + statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED); + return executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId)); + } + + private CompletableFuture executePickedExecution(Execution execution) { + final Optional task = taskResolver.resolve(execution.taskInstance.getTaskName()); + if (!task.isPresent()) { + LOG.error("Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.", execution.taskInstance.getTaskName()); + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + return new CompletableFuture<>(); + } + if (!(task.get() instanceof AsyncExecutionHandler)) { + throw new IllegalStateException("Should only ever try to execute async when task has an AsyncExecutionHandler"); + } + + AsyncExecutionHandler asyncHandler = (AsyncExecutionHandler) task.get(); + Instant executionStarted = clock.now(); + LOG.debug("Executing " + execution); + CompletableFuture completableFuture = asyncHandler.executeAsync(execution.taskInstance, new AsyncExecutionContext(schedulerState, execution, schedulerClient, executor.getExecutorService())); + + return completableFuture.handle((completion, ex) -> { + if (ex != null) { + if (ex instanceof RuntimeException) { + failure(task.get(), execution, ex, executionStarted, "Unhandled exception"); + statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); + } else { + failure(task.get(), execution, ex, executionStarted, "Error"); + statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); + } + return null; + } + LOG.debug("Execution done"); + complete(completion, execution, executionStarted); + statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED); + return null; + + }); + } + + private void complete(CompletionHandler completion, Execution execution, Instant executionStarted) { + ExecutionComplete completeEvent = ExecutionComplete.success(execution, executionStarted, clock.now()); + try { + completion.complete(completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution)); + statsRegistry.registerSingleCompletedExecution(completeEvent); + } catch (Throwable e) { + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR); + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. " + + "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, e); + } + } + + private void failure(Task task, Execution execution, Throwable cause, Instant executionStarted, String errorMessagePrefix) { + String logMessage = errorMessagePrefix + " during execution of task with name '{}'. Treating as failure."; + failureLogger.log(logMessage, cause, task.getName()); + + ExecutionComplete completeEvent = ExecutionComplete.failure(execution, executionStarted, clock.now(), cause); + try { + task.getFailureHandler().onFailure(completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution)); + statsRegistry.registerSingleCompletedExecution(completeEvent); + } catch (Throwable e) { + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR); + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. " + + "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, e); + } + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java index 38283c87..780afe33 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java @@ -21,16 +21,15 @@ import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; +// TODO: make super-class of the polling-strategies instead public class Executor { private static final Logger LOG = LoggerFactory.getLogger(Executor.class); @@ -38,33 +37,57 @@ public class Executor { private AtomicInteger currentlyInQueueOrProcessing = new AtomicInteger(0); private final ExecutorService executorService; private final Clock clock; + private Map, CompletableFuture> ongoingWork = new ConcurrentHashMap<>(); public Executor(ExecutorService executorService, Clock clock) { this.executorService = executorService; this.clock = clock; } - public void addToQueue(Runnable r, Runnable afterDone) { + // TODO: remove + public void addToQueue(Supplier> toRun, Runnable afterDone) { currentlyInQueueOrProcessing.incrementAndGet(); // if we always had a ThreadPoolExecutor we could check queue-size using getQueue() - executorService.execute(() -> { - // Execute - try { - r.run(); - } finally { - currentlyInQueueOrProcessing.decrementAndGet(); - // Run callbacks after decrementing currentlyInQueueOrProcessing - afterDone.run(); - } + + toRun.get().thenAccept(v -> { + // For async, these callbacks are run before complete, + // thus allowing queue of ongoing executions to grow without bound + currentlyInQueueOrProcessing.decrementAndGet(); + // Run callbacks after decrementing currentlyInQueueOrProcessing + afterDone.run(); }); + + } + + public void incrementInQueue() { + currentlyInQueueOrProcessing.incrementAndGet(); // if we always had a ThreadPoolExecutor we could check queue-size using getQueue() + } + public void decrementInQueue() { + currentlyInQueueOrProcessing.decrementAndGet(); + } + + public void addOngoingWork(CompletableFuture work) { + ongoingWork.put(work, work); } public List getCurrentlyExecuting() { return new ArrayList<>(currentlyProcessing.values()); } + @SuppressWarnings("rawtypes") + public void awaitCurrentlyExecuting() { + CompletableFuture[] ongoingWork = this.ongoingWork.keySet().toArray(new CompletableFuture[0]); + CompletableFuture.allOf(ongoingWork).join(); + } + public void stop(Duration shutdownMaxWait) { + LOG.info("Letting running executions finish. Will wait up to 2x{}.", shutdownMaxWait); + // TODO: upper timelimit for completable futures as well + // Wait for futures explicitly, as we can no longer rely on executorService.shutdown() + awaitCurrentlyExecuting(); + final Instant startShutdown = clock.now(); + if (ExecutorUtils.shutdownAndAwaitTermination(executorService, shutdownMaxWait, shutdownMaxWait)) { LOG.info("Scheduler stopped."); } else { @@ -99,4 +122,8 @@ public void removeCurrentlyProcessing(UUID executionId) { LOG.warn("Released execution was not found in collection of executions currently being processed. Should never happen. Execution-id: " + executionId); } } + + public java.util.concurrent.Executor getExecutorService() { + return executorService; + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java index 4b909bd5..c151cae8 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java @@ -17,7 +17,9 @@ import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.task.AsyncExecutionHandler; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +27,8 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; public class FetchCandidates implements PollStrategy { @@ -64,6 +68,7 @@ public FetchCandidates(Executor executor, TaskRepository taskRepository, Schedul upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize); } + @SuppressWarnings("rawtypes") @Override public void run() { Instant now = clock.now(); @@ -81,16 +86,40 @@ public void run() { (Integer leftInBatch) -> leftInBatch <= lowerLimit); for (Execution e : fetchedDueExecutions) { - executor.addToQueue( - () -> { - final Optional candidate = new PickDue(e, newDueBatch).call(); - candidate.ifPresent(picked -> new ExecutePicked(executor, taskRepository, earlyExecutionListener, schedulerClient, statsRegistry, - taskResolver, schedulerState, failureLogger, - clock, picked).run()); - }, - () -> { + + CompletableFuture future = CompletableFuture + .runAsync(executor::incrementInQueue, executor.getExecutorService()) + .thenComposeAsync((result) -> CompletableFuture.supplyAsync(() -> { + Optional candidate = new PickDue(e, newDueBatch).call(); + return candidate.orElse(null); // TODO: remove optional before merge + }, executor.getExecutorService())) + .thenComposeAsync(picked -> { + if (picked == null) { + // Skip this step if we were not able to pick the execution (someone else got the lock) + return CompletableFuture.completedFuture(null); + } + // Experimental support for async execution. Peek at Task to see if support async + // Unresolved tasks will be handled further in + final Optional task = taskResolver.resolve(picked.taskInstance.getTaskName()); + if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) { + + return new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked).toCompletableFuture(); + } else { + + return CompletableFuture.runAsync(new ExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked), executor.getExecutorService()); + } + + }, executor.getExecutorService()) + .thenAccept(x -> { + executor.decrementInQueue(); newDueBatch.oneExecutionDone(triggerCheckForNewExecutions::run); }); + executor.addOngoingWork(future); + } statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java index 44ce7a22..039f6864 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java @@ -17,12 +17,16 @@ import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.task.AsyncExecutionHandler; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Instant; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; public class LockAndFetchCandidates implements PollStrategy { @@ -88,16 +92,34 @@ public void run() { } for (Execution picked : pickedExecutions) { - executor.addToQueue( - new ExecutePicked(executor, taskRepository, earlyExecutionListener, schedulerClient, statsRegistry, - taskResolver, schedulerState, failureLogger, - clock, picked), - () -> { + CompletableFuture future = CompletableFuture + .runAsync(executor::incrementInQueue, executor.getExecutorService()) + .thenComposeAsync((_ignored) -> { + // Experimental support for async execution. Peek at Task to see if support async + // Unresolved tasks will be handled further in + final Optional task = taskResolver.resolve(picked.taskInstance.getTaskName()); + if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) { + + return new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked).toCompletableFuture(); + } else { + + return CompletableFuture.runAsync(new ExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked), executor.getExecutorService()); + } + + }, executor.getExecutorService()) + .thenAccept(x -> { + executor.decrementInQueue(); if (moreExecutionsInDatabase.get() && executor.getNumberInQueueOrProcessing() <= lowerLimit) { triggerCheckForNewExecutions.run(); } }); + + executor.addOngoingWork(future); } statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index d8eeb2c7..7ab63423 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -324,4 +324,7 @@ public static SchedulerBuilder create(DataSource dataSource, List> known return new SchedulerBuilder(dataSource, knownTasks); } + protected void awaitCurrentlyExecuting() { + executor.awaitCurrentlyExecuting(); + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionContext.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionContext.java new file mode 100644 index 00000000..46515c8c --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionContext.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task; + +import com.github.kagkarlsson.scheduler.SchedulerClient; +import com.github.kagkarlsson.scheduler.SchedulerState; + +import java.util.concurrent.Executor; + +public class AsyncExecutionContext extends ExecutionContext { + private final Executor executor; + + public AsyncExecutionContext(SchedulerState schedulerState, Execution execution, SchedulerClient schedulerClient, + Executor executor) { + super(schedulerState, execution, schedulerClient); + this.executor = executor; + } + + public Executor getAsyncExecutor() { + return executor; + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionHandler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionHandler.java new file mode 100644 index 00000000..a3838a57 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionHandler.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task; + +import java.util.concurrent.CompletableFuture; + +/** + * Experimental + */ +public interface AsyncExecutionHandler extends ExecutionHandler { + + CompletableFuture> executeAsync(TaskInstance taskInstance, AsyncExecutionContext executionContext); + + @Override + default CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { + throw new UnsupportedOperationException("Standard blocking execute note supported in this handler."); + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java index 9fa113f9..c1b8cf51 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java @@ -16,5 +16,7 @@ package com.github.kagkarlsson.scheduler.task; public interface ExecutionHandler { + CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext); + } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java index d662622c..7007f408 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java @@ -54,6 +54,7 @@ public void setTime(Instant newtime) { public void runAnyDueExecutions() { super.executeDueStrategy.run(); + super.awaitCurrentlyExecuting(); } public void runDeadExecutionDetection() { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java index 1abad063..168725c4 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java @@ -76,6 +76,7 @@ public ManualScheduler build() { final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer, clock); final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer, clock); + // TODO: new DirectExecutorService() will not work after starting to execute work async return new ManualScheduler(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, executorThreads, new DirectExecutorService(), schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, Optional.ofNullable(pollingStrategyConfig).orElse(PollingStrategyConfig.DEFAULT_FETCH), diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java index b0d7c2d8..e0c24150 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java @@ -5,6 +5,7 @@ import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.task.*; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; +import com.github.kagkarlsson.scheduler.testhelper.ManualScheduler; import com.github.kagkarlsson.scheduler.testhelper.SettableClock; import com.google.common.util.concurrent.MoreExecutors; import org.hamcrest.Matchers; @@ -97,33 +98,6 @@ public void scheduler_should_handle_dead_executions() { assertThat(jdbcTaskRepository.getDue(Instant.now(), POLLING_LIMIT), hasSize(1)); } - @Test - public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat() { - final Instant now = Instant.now(); - settableClock.set(now.minus(Duration.ofHours(1))); - final Instant oneHourAgo = settableClock.now(); - - final TaskInstance taskInstance = nonCompleting.instance("id1"); - final SchedulableTaskInstance execution1 = new SchedulableTaskInstance<>(taskInstance, oneHourAgo); - jdbcTaskRepository.createIfNotExists(execution1); - - scheduler.executeDue(); - assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(1)); - - scheduler.executeDue(); - assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(1)); - - settableClock.set(Instant.now()); - - scheduler.detectDeadExecutions(); - assertThat(deadExecutionHandler.timesCalled, is(1)); - - settableClock.set(Instant.now()); - - scheduler.executeDue(); - assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(2)); - } - public static class NonCompletingTask extends OneTimeTask { private final VoidExecutionHandler handler; diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java index 88c57aa7..9f1b1130 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java @@ -125,14 +125,14 @@ public void scheduler_should_execute_rescheduled_task_when_exactly_due() { @Test public void scheduler_should_not_execute_canceled_tasks() { OneTimeTask oneTimeTask = TestTasks.oneTime("OneTime", Void.class, handler); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); Instant executionTime = clock.now().plus(ofMinutes(1)); String instanceId = "1"; TaskInstance oneTimeTaskInstance = oneTimeTask.instance(instanceId); scheduler.schedule(oneTimeTaskInstance, executionTime); scheduler.cancel(oneTimeTaskInstance); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); assertThat(handler.timesExecuted.get(), is(0)); clock.set(executionTime); @@ -143,16 +143,16 @@ public void scheduler_should_not_execute_canceled_tasks() { @Test public void scheduler_should_execute_recurring_task_and_reschedule() { RecurringTask recurringTask = TestTasks.recurring("Recurring", FixedDelay.of(ofHours(1)), handler); - Scheduler scheduler = schedulerFor(recurringTask); + ManualScheduler scheduler = schedulerFor(recurringTask); scheduler.schedule(recurringTask.instance("single"), clock.now()); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); assertThat(handler.timesExecuted.get(), is(1)); Instant nextExecutionTime = clock.now().plus(ofHours(1)); clock.set(nextExecutionTime); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); assertThat(handler.timesExecuted.get(), is(2)); } @@ -183,10 +183,10 @@ public void should_expose_cause_of_failure_to_completion_handler() throws Interr Task oneTimeTask = ComposableTask.customTask("cause-testing-task", Void.class, TestTasks.REMOVE_ON_COMPLETE, failureHandler, (inst, ctx) -> { throw new RuntimeException("Failed!");}); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); scheduler.schedule(oneTimeTask.instance("1"), clock.now()); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); // failureHandler.waitForNotify.await(); assertThat(failureHandler.result, is(ExecutionComplete.Result.FAILED)); @@ -206,15 +206,15 @@ public void should_only_attempt_task_when_max_retries_handler_used() throws Inte throw new RuntimeException("Failed!"); }); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); scheduler.schedule(oneTimeTask.instance("1"), clock.now()); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); //Simulate 15 minutes worth of time to validate we did not process more than we should for( int minuteWorthOfTime = 1; minuteWorthOfTime <= 15; minuteWorthOfTime ++) { clock.set(clock.now().plus(ofMinutes(1))); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); } //will always be maxRetries + 1 due to the first call always being required. @@ -235,16 +235,16 @@ public void should_reschedule_failure_on_exponential_backoff_with_default_rate() } }); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); Instant firstExecution = clock.now(); scheduler.schedule(oneTimeTask.instance("1"), firstExecution); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); //Simulate 30 minutes worth of time to validate we did not process more than we should for( int minuteWorthOfTime = 1; minuteWorthOfTime <= 30; minuteWorthOfTime ++) { clock.set(clock.now().plus(ofMinutes(1))); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); } assertThat(executionTimes.size(), is(10)); @@ -275,16 +275,16 @@ public void should_reschedule_failure_on_exponential_backoff_with_defined_rate() } }); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); Instant firstExecution = clock.now(); scheduler.schedule(oneTimeTask.instance("1"), firstExecution); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); //Simulate 30 minutes worth of time to validate we did not process more than we should for( int minuteWorthOfTime = 1; minuteWorthOfTime <= 30; minuteWorthOfTime ++) { clock.set(clock.now().plus(ofMinutes(1))); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); } assertThat(executionTimes.size(), is(10)); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ExecutorPoolTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ExecutorPoolTest.java index 0c7de39c..c991011a 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ExecutorPoolTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ExecutorPoolTest.java @@ -72,7 +72,6 @@ public void test_execute_until_none_left_high_volume() { testExecuteUntilNoneLeft(12, 4, 200); } - private void testExecuteUntilNoneLeft(int pollingLimit, int threads, int executionsToRun) { Instant now = Instant.now(); OneTimeTask task = TestTasks.oneTime("onetime-a", Void.class, TestTasks.DO_NOTHING); diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/AsyncOneTimeTaskMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/AsyncOneTimeTaskMain.java new file mode 100644 index 00000000..ed8f0358 --- /dev/null +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/AsyncOneTimeTaskMain.java @@ -0,0 +1,106 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.examples; + +import com.github.kagkarlsson.examples.helpers.Example; +import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.time.Instant; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.function.Supplier; + +import static com.github.kagkarlsson.scheduler.task.helper.Tasks.DEFAULT_RETRY_INTERVAL; + +public class AsyncOneTimeTaskMain extends Example { + private static final Logger LOG = LoggerFactory.getLogger(AsyncOneTimeTaskMain.class); + + public static void main(String[] args) { + new AsyncOneTimeTaskMain().runWithDatasource(); + } + + @Override + public void run(DataSource dataSource) { + int iterations = 100; + CountDownLatch countdown = new CountDownLatch(iterations); + + AsyncTask task = new AsyncTask<>("async-test", Void.class, + (taskInstance, executionContext) -> CompletableFuture.supplyAsync(() -> { + LOG.info("Executing " + taskInstance.getId()); + return new CompletionHandler() { + @Override + public void complete(ExecutionComplete executionComplete, ExecutionOperations executionOperations) { + executionOperations.remove(); + countdown.countDown(); + LOG.info("Completed " + executionComplete.getExecution().taskInstance.getId()); + } + }; + }, executionContext.getAsyncExecutor())); + + final Scheduler scheduler = Scheduler + .create(dataSource, task) + .threads(2) + .build(); + + // Schedule the task for execution a certain time in the future and optionally provide custom data for the execution + for (int i = 0; i < iterations; i++) { + scheduler.schedule(task.instance(String.valueOf(i)), Instant.now()); + } + + scheduler.start(); + + try { + countdown.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + LOG.info("Done!"); + scheduler.stop(); + } + + public static class AsyncTask extends AbstractTask implements AsyncExecutionHandler { + private final AsyncExecutionHandler handler; + + public AsyncTask(String name, Class dataClass, AsyncExecutionHandler handler) { + super(name, + dataClass, + new FailureHandler.OnFailureRetryLater<>(DEFAULT_RETRY_INTERVAL), + new DeadExecutionHandler.ReviveDeadExecution()); + this.handler = handler; + } + + @Override + public CompletableFuture> executeAsync(TaskInstance taskInstance, AsyncExecutionContext executionContext) { + return handler.executeAsync(taskInstance, executionContext); + } + + @Override + public SchedulableInstance schedulableInstance(String id) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id), (currentTime) -> currentTime); + } + + @Override + public SchedulableInstance schedulableInstance(String id, T data) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id, data), (currentTime) -> currentTime); + } + + } +} diff --git a/examples/features/src/main/resources/logback.xml b/examples/features/src/main/resources/logback.xml index 92320971..1d909f19 100644 --- a/examples/features/src/main/resources/logback.xml +++ b/examples/features/src/main/resources/logback.xml @@ -1,25 +1,8 @@ - - %d{HH:mm:ss.SSS} %highlight(%-5level) %cyan(%logger{50}) - %msg%n%rootException + %d{HH:mm:ss.SSS} %highlight(%-5level) [%-30.30thread] %cyan(%logger{50}) - %msg%n%rootException diff --git a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/ExampleContext.java b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/ExampleContext.java index c108c158..0f421f82 100644 --- a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/ExampleContext.java +++ b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/ExampleContext.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples.boot; import com.github.kagkarlsson.scheduler.SchedulerClient; diff --git a/pom.xml b/pom.xml index ae5d58e8..71579fc5 100644 --- a/pom.xml +++ b/pom.xml @@ -163,6 +163,7 @@ src/test/** **/*.properties **/*.sql + **/*.xml .github/** test/** **/RuntimeTypeAdapterFactory.java