From 96050dcee4c42c30cb02ba060989351beb358059 Mon Sep 17 00:00:00 2001 From: Gustav Karlsson Date: Fri, 14 Apr 2023 11:23:02 +0200 Subject: [PATCH 1/4] PoC for experimental async support --- .../scheduler/AsyncExecutePicked.java | 128 ++++++++++++++++++ .../kagkarlsson/scheduler/Executor.java | 6 + .../scheduler/FetchCandidates.java | 25 +++- .../scheduler/LockAndFetchCandidates.java | 24 +++- .../scheduler/task/AsyncExecutionContext.java | 35 +++++ .../scheduler/task/AsyncExecutionHandler.java | 31 +++++ .../scheduler/task/ExecutionHandler.java | 2 + .../examples/AsyncOneTimeTaskMain.java | 106 +++++++++++++++ .../features/src/main/resources/logback.xml | 19 +-- .../examples/boot/ExampleContext.java | 15 ++ pom.xml | 1 + 11 files changed, 367 insertions(+), 25 deletions(-) create mode 100644 db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java create mode 100644 db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionContext.java create mode 100644 db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionHandler.java create mode 100644 examples/features/src/main/java/com/github/kagkarlsson/examples/AsyncOneTimeTaskMain.java diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java new file mode 100644 index 00000000..d3027928 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java @@ -0,0 +1,128 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler; + +import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; +import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.task.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +@SuppressWarnings("rawtypes") +class AsyncExecutePicked implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutePicked.class); + private final Executor executor; + private final TaskRepository taskRepository; + private SchedulerClientEventListener earlyExecutionListener; + private final SchedulerClient schedulerClient; + private final StatsRegistry statsRegistry; + private final TaskResolver taskResolver; + private final SchedulerState schedulerState; + private final ConfigurableLogger failureLogger; + private final Clock clock; + private final Execution pickedExecution; + + public AsyncExecutePicked(Executor executor, TaskRepository taskRepository, SchedulerClientEventListener earlyExecutionListener, SchedulerClient schedulerClient, StatsRegistry statsRegistry, + TaskResolver taskResolver, SchedulerState schedulerState, ConfigurableLogger failureLogger, + Clock clock, Execution pickedExecution) { + this.executor = executor; + this.taskRepository = taskRepository; + this.earlyExecutionListener = earlyExecutionListener; + this.schedulerClient = schedulerClient; + this.statsRegistry = statsRegistry; + this.taskResolver = taskResolver; + this.schedulerState = schedulerState; + this.failureLogger = failureLogger; + this.clock = clock; + this.pickedExecution = pickedExecution; + } + + @Override + public void run() { + // FIXLATER: need to cleanup all the references back to scheduler fields + final UUID executionId = executor.addCurrentlyProcessing(new CurrentlyExecuting(pickedExecution, clock)); + statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED); + executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId)); + } + + private CompletableFuture executePickedExecution(Execution execution) { + final Optional task = taskResolver.resolve(execution.taskInstance.getTaskName()); + if (!task.isPresent()) { + LOG.error("Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.", execution.taskInstance.getTaskName()); + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + return new CompletableFuture<>(); + } + if (!(task.get() instanceof AsyncExecutionHandler)) { + throw new IllegalStateException("Should only ever try to execute async when task has an AsyncExecutionHandler"); + } + + AsyncExecutionHandler asyncHandler = (AsyncExecutionHandler) task.get(); + Instant executionStarted = clock.now(); + LOG.debug("Executing " + execution); + CompletableFuture completableFuture = asyncHandler.executeAsync(execution.taskInstance, new AsyncExecutionContext(schedulerState, execution, schedulerClient, executor.getExecutorService())); + + return completableFuture.whenCompleteAsync((completion, ex) -> { + if (ex != null) { + if (ex instanceof RuntimeException) { + failure(task.get(), execution, ex, executionStarted, "Unhandled exception"); + statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); + } else { + failure(task.get(), execution, ex, executionStarted, "Error"); + statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); + } + return; + } + LOG.debug("Execution done"); + complete(completion, execution, executionStarted); + statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED); + + }, executor.getExecutorService()); + } + + private void complete(CompletionHandler completion, Execution execution, Instant executionStarted) { + ExecutionComplete completeEvent = ExecutionComplete.success(execution, executionStarted, clock.now()); + try { + completion.complete(completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution)); + statsRegistry.registerSingleCompletedExecution(completeEvent); + } catch (Throwable e) { + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR); + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. " + + "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, e); + } + } + + private void failure(Task task, Execution execution, Throwable cause, Instant executionStarted, String errorMessagePrefix) { + String logMessage = errorMessagePrefix + " during execution of task with name '{}'. Treating as failure."; + failureLogger.log(logMessage, cause, task.getName()); + + ExecutionComplete completeEvent = ExecutionComplete.failure(execution, executionStarted, clock.now(), cause); + try { + task.getFailureHandler().onFailure(completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution)); + statsRegistry.registerSingleCompletedExecution(completeEvent); + } catch (Throwable e) { + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR); + statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. " + + "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, e); + } + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java index 38283c87..48f25eda 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java @@ -99,4 +99,10 @@ public void removeCurrentlyProcessing(UUID executionId) { LOG.warn("Released execution was not found in collection of executions currently being processed. Should never happen. Execution-id: " + executionId); } } + + public java.util.concurrent.Executor getExecutorService() { + return executorService; + } + + } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java index 4b909bd5..21dfa695 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java @@ -17,7 +17,9 @@ import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.task.AsyncExecutionHandler; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +66,7 @@ public FetchCandidates(Executor executor, TaskRepository taskRepository, Schedul upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize); } + @SuppressWarnings("rawtypes") @Override public void run() { Instant now = clock.now(); @@ -84,9 +87,25 @@ public void run() { executor.addToQueue( () -> { final Optional candidate = new PickDue(e, newDueBatch).call(); - candidate.ifPresent(picked -> new ExecutePicked(executor, taskRepository, earlyExecutionListener, schedulerClient, statsRegistry, - taskResolver, schedulerState, failureLogger, - clock, picked).run()); + + candidate.ifPresent(picked -> { + + // Experimental support for async execution. Peek at Task to see if support async + // Unresolved tasks will be handled further in + final Optional task = taskResolver.resolve(picked.taskInstance.getTaskName()); + if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) { + // Experimental branch + new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked).run(); + + } else { + // The default + new ExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked).run(); + } + }); }, () -> { newDueBatch.oneExecutionDone(triggerCheckForNewExecutions::run); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java index 44ce7a22..8c3e94c5 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java @@ -17,12 +17,15 @@ import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.task.AsyncExecutionHandler; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Instant; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; public class LockAndFetchCandidates implements PollStrategy { @@ -88,10 +91,23 @@ public void run() { } for (Execution picked : pickedExecutions) { - executor.addToQueue( - new ExecutePicked(executor, taskRepository, earlyExecutionListener, schedulerClient, statsRegistry, - taskResolver, schedulerState, failureLogger, - clock, picked), + executor.addToQueue(() -> { + // Experimental support for async execution. Peek at Task to see if support async + // Unresolved tasks will be handled further in + final Optional task = taskResolver.resolve(picked.taskInstance.getTaskName()); + if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) { + // Experimental branch + new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked).run(); + + } else { + // The default + new ExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked).run(); + } + }, () -> { if (moreExecutionsInDatabase.get() && executor.getNumberInQueueOrProcessing() <= lowerLimit) { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionContext.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionContext.java new file mode 100644 index 00000000..46515c8c --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionContext.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task; + +import com.github.kagkarlsson.scheduler.SchedulerClient; +import com.github.kagkarlsson.scheduler.SchedulerState; + +import java.util.concurrent.Executor; + +public class AsyncExecutionContext extends ExecutionContext { + private final Executor executor; + + public AsyncExecutionContext(SchedulerState schedulerState, Execution execution, SchedulerClient schedulerClient, + Executor executor) { + super(schedulerState, execution, schedulerClient); + this.executor = executor; + } + + public Executor getAsyncExecutor() { + return executor; + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionHandler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionHandler.java new file mode 100644 index 00000000..a3838a57 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AsyncExecutionHandler.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task; + +import java.util.concurrent.CompletableFuture; + +/** + * Experimental + */ +public interface AsyncExecutionHandler extends ExecutionHandler { + + CompletableFuture> executeAsync(TaskInstance taskInstance, AsyncExecutionContext executionContext); + + @Override + default CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { + throw new UnsupportedOperationException("Standard blocking execute note supported in this handler."); + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java index 9fa113f9..c1b8cf51 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java @@ -16,5 +16,7 @@ package com.github.kagkarlsson.scheduler.task; public interface ExecutionHandler { + CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext); + } diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/AsyncOneTimeTaskMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/AsyncOneTimeTaskMain.java new file mode 100644 index 00000000..ed8f0358 --- /dev/null +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/AsyncOneTimeTaskMain.java @@ -0,0 +1,106 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.examples; + +import com.github.kagkarlsson.examples.helpers.Example; +import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.time.Instant; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.function.Supplier; + +import static com.github.kagkarlsson.scheduler.task.helper.Tasks.DEFAULT_RETRY_INTERVAL; + +public class AsyncOneTimeTaskMain extends Example { + private static final Logger LOG = LoggerFactory.getLogger(AsyncOneTimeTaskMain.class); + + public static void main(String[] args) { + new AsyncOneTimeTaskMain().runWithDatasource(); + } + + @Override + public void run(DataSource dataSource) { + int iterations = 100; + CountDownLatch countdown = new CountDownLatch(iterations); + + AsyncTask task = new AsyncTask<>("async-test", Void.class, + (taskInstance, executionContext) -> CompletableFuture.supplyAsync(() -> { + LOG.info("Executing " + taskInstance.getId()); + return new CompletionHandler() { + @Override + public void complete(ExecutionComplete executionComplete, ExecutionOperations executionOperations) { + executionOperations.remove(); + countdown.countDown(); + LOG.info("Completed " + executionComplete.getExecution().taskInstance.getId()); + } + }; + }, executionContext.getAsyncExecutor())); + + final Scheduler scheduler = Scheduler + .create(dataSource, task) + .threads(2) + .build(); + + // Schedule the task for execution a certain time in the future and optionally provide custom data for the execution + for (int i = 0; i < iterations; i++) { + scheduler.schedule(task.instance(String.valueOf(i)), Instant.now()); + } + + scheduler.start(); + + try { + countdown.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + LOG.info("Done!"); + scheduler.stop(); + } + + public static class AsyncTask extends AbstractTask implements AsyncExecutionHandler { + private final AsyncExecutionHandler handler; + + public AsyncTask(String name, Class dataClass, AsyncExecutionHandler handler) { + super(name, + dataClass, + new FailureHandler.OnFailureRetryLater<>(DEFAULT_RETRY_INTERVAL), + new DeadExecutionHandler.ReviveDeadExecution()); + this.handler = handler; + } + + @Override + public CompletableFuture> executeAsync(TaskInstance taskInstance, AsyncExecutionContext executionContext) { + return handler.executeAsync(taskInstance, executionContext); + } + + @Override + public SchedulableInstance schedulableInstance(String id) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id), (currentTime) -> currentTime); + } + + @Override + public SchedulableInstance schedulableInstance(String id, T data) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id, data), (currentTime) -> currentTime); + } + + } +} diff --git a/examples/features/src/main/resources/logback.xml b/examples/features/src/main/resources/logback.xml index 92320971..1d909f19 100644 --- a/examples/features/src/main/resources/logback.xml +++ b/examples/features/src/main/resources/logback.xml @@ -1,25 +1,8 @@ - - %d{HH:mm:ss.SSS} %highlight(%-5level) %cyan(%logger{50}) - %msg%n%rootException + %d{HH:mm:ss.SSS} %highlight(%-5level) [%-30.30thread] %cyan(%logger{50}) - %msg%n%rootException diff --git a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/ExampleContext.java b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/ExampleContext.java index c108c158..0f421f82 100644 --- a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/ExampleContext.java +++ b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/ExampleContext.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples.boot; import com.github.kagkarlsson.scheduler.SchedulerClient; diff --git a/pom.xml b/pom.xml index ae5d58e8..71579fc5 100644 --- a/pom.xml +++ b/pom.xml @@ -163,6 +163,7 @@ src/test/** **/*.properties **/*.sql + **/*.xml .github/** test/** **/RuntimeTypeAdapterFactory.java From 619c7226f2f4fc4b9e1fa7dc3580350620ec15df Mon Sep 17 00:00:00 2001 From: Gustav Karlsson Date: Fri, 14 Apr 2023 11:37:58 +0200 Subject: [PATCH 2/4] Comment --- .../main/java/com/github/kagkarlsson/scheduler/Executor.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java index 48f25eda..8d977a62 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java @@ -51,6 +51,8 @@ public void addToQueue(Runnable r, Runnable afterDone) { try { r.run(); } finally { + // For async, these callbacks are run before complete, + // thus allowing queue of ongoing executions to grow without bound currentlyInQueueOrProcessing.decrementAndGet(); // Run callbacks after decrementing currentlyInQueueOrProcessing afterDone.run(); From 7c05948a2dbc0d9405b1cfaa8565630a99df040a Mon Sep 17 00:00:00 2001 From: Gustav Karlsson Date: Fri, 14 Apr 2023 21:14:21 +0200 Subject: [PATCH 3/4] Include all steps in completablefuture chain --- .../scheduler/AsyncExecutePicked.java | 16 +++--- .../kagkarlsson/scheduler/Executor.java | 39 ++++++------- .../scheduler/FetchCandidates.java | 56 +++++++++++-------- .../scheduler/LockAndFetchCandidates.java | 24 ++++---- 4 files changed, 74 insertions(+), 61 deletions(-) diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java index d3027928..5b8eb535 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/AsyncExecutePicked.java @@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture; @SuppressWarnings("rawtypes") -class AsyncExecutePicked implements Runnable { +class AsyncExecutePicked { private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutePicked.class); private final Executor executor; private final TaskRepository taskRepository; @@ -55,15 +55,14 @@ public AsyncExecutePicked(Executor executor, TaskRepository taskRepository, Sche this.pickedExecution = pickedExecution; } - @Override - public void run() { + public CompletableFuture toCompletableFuture() { // FIXLATER: need to cleanup all the references back to scheduler fields final UUID executionId = executor.addCurrentlyProcessing(new CurrentlyExecuting(pickedExecution, clock)); statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED); - executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId)); + return executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId)); } - private CompletableFuture executePickedExecution(Execution execution) { + private CompletableFuture executePickedExecution(Execution execution) { final Optional task = taskResolver.resolve(execution.taskInstance.getTaskName()); if (!task.isPresent()) { LOG.error("Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.", execution.taskInstance.getTaskName()); @@ -79,7 +78,7 @@ private CompletableFuture executePickedExecution(Execution ex LOG.debug("Executing " + execution); CompletableFuture completableFuture = asyncHandler.executeAsync(execution.taskInstance, new AsyncExecutionContext(schedulerState, execution, schedulerClient, executor.getExecutorService())); - return completableFuture.whenCompleteAsync((completion, ex) -> { + return completableFuture.handle((completion, ex) -> { if (ex != null) { if (ex instanceof RuntimeException) { failure(task.get(), execution, ex, executionStarted, "Unhandled exception"); @@ -88,13 +87,14 @@ private CompletableFuture executePickedExecution(Execution ex failure(task.get(), execution, ex, executionStarted, "Error"); statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); } - return; + return null; } LOG.debug("Execution done"); complete(completion, execution, executionStarted); statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED); + return null; - }, executor.getExecutorService()); + }); } private void complete(CompletionHandler completion, Execution execution, Instant executionStarted) { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java index 8d977a62..58117af8 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java @@ -21,14 +21,11 @@ import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; public class Executor { @@ -44,20 +41,24 @@ public Executor(ExecutorService executorService, Clock clock) { this.clock = clock; } - public void addToQueue(Runnable r, Runnable afterDone) { + public void addToQueue(Supplier> toRun, Runnable afterDone) { currentlyInQueueOrProcessing.incrementAndGet(); // if we always had a ThreadPoolExecutor we could check queue-size using getQueue() - executorService.execute(() -> { - // Execute - try { - r.run(); - } finally { - // For async, these callbacks are run before complete, - // thus allowing queue of ongoing executions to grow without bound - currentlyInQueueOrProcessing.decrementAndGet(); - // Run callbacks after decrementing currentlyInQueueOrProcessing - afterDone.run(); - } + + toRun.get().thenAccept(v -> { + // For async, these callbacks are run before complete, + // thus allowing queue of ongoing executions to grow without bound + currentlyInQueueOrProcessing.decrementAndGet(); + // Run callbacks after decrementing currentlyInQueueOrProcessing + afterDone.run(); }); + + } + + public void incrementInQueue() { + currentlyInQueueOrProcessing.incrementAndGet(); // if we always had a ThreadPoolExecutor we could check queue-size using getQueue() + } + public void decrementInQueue() { + currentlyInQueueOrProcessing.decrementAndGet(); } public List getCurrentlyExecuting() { @@ -67,6 +68,7 @@ public List getCurrentlyExecuting() { public void stop(Duration shutdownMaxWait) { LOG.info("Letting running executions finish. Will wait up to 2x{}.", shutdownMaxWait); final Instant startShutdown = clock.now(); + if (ExecutorUtils.shutdownAndAwaitTermination(executorService, shutdownMaxWait, shutdownMaxWait)) { LOG.info("Scheduler stopped."); } else { @@ -106,5 +108,4 @@ public java.util.concurrent.Executor getExecutorService() { return executorService; } - } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java index 21dfa695..42cdf002 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; public class FetchCandidates implements PollStrategy { @@ -84,32 +85,39 @@ public void run() { (Integer leftInBatch) -> leftInBatch <= lowerLimit); for (Execution e : fetchedDueExecutions) { - executor.addToQueue( - () -> { - final Optional candidate = new PickDue(e, newDueBatch).call(); - - candidate.ifPresent(picked -> { - - // Experimental support for async execution. Peek at Task to see if support async - // Unresolved tasks will be handled further in - final Optional task = taskResolver.resolve(picked.taskInstance.getTaskName()); - if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) { - // Experimental branch - new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener, - schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, - clock, picked).run(); - - } else { - // The default - new ExecutePicked(executor, taskRepository, earlyExecutionListener, - schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, - clock, picked).run(); - } - }); - }, - () -> { + + CompletableFuture future = CompletableFuture + .runAsync(executor::incrementInQueue, executor.getExecutorService()) + .thenComposeAsync((result) -> CompletableFuture.supplyAsync(() -> new PickDue(e, newDueBatch).call(), executor.getExecutorService())) + .thenComposeAsync((x) -> { + if (!x.isPresent()) { + return CompletableFuture.completedFuture(null); + } else { + return CompletableFuture.supplyAsync(x::get); + } + }, executor.getExecutorService()) + .thenComposeAsync(picked -> { + // Experimental support for async execution. Peek at Task to see if support async + // Unresolved tasks will be handled further in + final Optional task = taskResolver.resolve(picked.taskInstance.getTaskName()); + if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) { + + return new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked).toCompletableFuture(); + } else { + + return CompletableFuture.runAsync(new ExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked), executor.getExecutorService()); + } + + }, executor.getExecutorService()) + .thenAccept(x -> { + executor.decrementInQueue(); newDueBatch.oneExecutionDone(triggerCheckForNewExecutions::run); }); + } statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java index 8c3e94c5..9450a0a8 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java @@ -26,6 +26,7 @@ import java.time.Instant; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; public class LockAndFetchCandidates implements PollStrategy { @@ -91,24 +92,27 @@ public void run() { } for (Execution picked : pickedExecutions) { - executor.addToQueue(() -> { + CompletableFuture + .runAsync(executor::incrementInQueue, executor.getExecutorService()) + .thenComposeAsync((_ignored) -> { // Experimental support for async execution. Peek at Task to see if support async // Unresolved tasks will be handled further in final Optional task = taskResolver.resolve(picked.taskInstance.getTaskName()); if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) { - // Experimental branch - new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener, - schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, - clock, picked).run(); + return new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener, + schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, + clock, picked).toCompletableFuture(); } else { - // The default - new ExecutePicked(executor, taskRepository, earlyExecutionListener, + + return CompletableFuture.runAsync(new ExecutePicked(executor, taskRepository, earlyExecutionListener, schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger, - clock, picked).run(); + clock, picked), executor.getExecutorService()); } - }, - () -> { + + }, executor.getExecutorService()) + .thenAccept(x -> { + executor.decrementInQueue(); if (moreExecutionsInDatabase.get() && executor.getNumberInQueueOrProcessing() <= lowerLimit) { triggerCheckForNewExecutions.run(); From b88d760ad066c55806e6c12162be21a1c0ce93a6 Mon Sep 17 00:00:00 2001 From: Gustav Karlsson Date: Mon, 17 Apr 2023 12:53:03 +0200 Subject: [PATCH 4/4] Fix shutdown, ManualScheduler, tests. --- .../kagkarlsson/scheduler/Executor.java | 20 +++++++++++- .../scheduler/FetchCandidates.java | 16 ++++++---- .../scheduler/LockAndFetchCandidates.java | 4 ++- .../kagkarlsson/scheduler/Scheduler.java | 3 ++ .../scheduler/testhelper/ManualScheduler.java | 1 + .../scheduler/testhelper/TestHelper.java | 1 + .../scheduler/DeadExecutionsTest.java | 28 +--------------- .../kagkarlsson/scheduler/SchedulerTest.java | 32 +++++++++---------- .../functional/ExecutorPoolTest.java | 1 - 9 files changed, 53 insertions(+), 53 deletions(-) diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java index 58117af8..780afe33 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java @@ -23,11 +23,13 @@ import java.time.Instant; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; +// TODO: make super-class of the polling-strategies instead public class Executor { private static final Logger LOG = LoggerFactory.getLogger(Executor.class); @@ -35,12 +37,14 @@ public class Executor { private AtomicInteger currentlyInQueueOrProcessing = new AtomicInteger(0); private final ExecutorService executorService; private final Clock clock; + private Map, CompletableFuture> ongoingWork = new ConcurrentHashMap<>(); public Executor(ExecutorService executorService, Clock clock) { this.executorService = executorService; this.clock = clock; } + // TODO: remove public void addToQueue(Supplier> toRun, Runnable afterDone) { currentlyInQueueOrProcessing.incrementAndGet(); // if we always had a ThreadPoolExecutor we could check queue-size using getQueue() @@ -61,12 +65,27 @@ public void decrementInQueue() { currentlyInQueueOrProcessing.decrementAndGet(); } + public void addOngoingWork(CompletableFuture work) { + ongoingWork.put(work, work); + } + public List getCurrentlyExecuting() { return new ArrayList<>(currentlyProcessing.values()); } + @SuppressWarnings("rawtypes") + public void awaitCurrentlyExecuting() { + CompletableFuture[] ongoingWork = this.ongoingWork.keySet().toArray(new CompletableFuture[0]); + CompletableFuture.allOf(ongoingWork).join(); + } + public void stop(Duration shutdownMaxWait) { + LOG.info("Letting running executions finish. Will wait up to 2x{}.", shutdownMaxWait); + // TODO: upper timelimit for completable futures as well + // Wait for futures explicitly, as we can no longer rely on executorService.shutdown() + awaitCurrentlyExecuting(); + final Instant startShutdown = clock.now(); if (ExecutorUtils.shutdownAndAwaitTermination(executorService, shutdownMaxWait, shutdownMaxWait)) { @@ -107,5 +126,4 @@ public void removeCurrentlyProcessing(UUID executionId) { public java.util.concurrent.Executor getExecutorService() { return executorService; } - } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java index 42cdf002..c151cae8 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -88,15 +89,15 @@ public void run() { CompletableFuture future = CompletableFuture .runAsync(executor::incrementInQueue, executor.getExecutorService()) - .thenComposeAsync((result) -> CompletableFuture.supplyAsync(() -> new PickDue(e, newDueBatch).call(), executor.getExecutorService())) - .thenComposeAsync((x) -> { - if (!x.isPresent()) { + .thenComposeAsync((result) -> CompletableFuture.supplyAsync(() -> { + Optional candidate = new PickDue(e, newDueBatch).call(); + return candidate.orElse(null); // TODO: remove optional before merge + }, executor.getExecutorService())) + .thenComposeAsync(picked -> { + if (picked == null) { + // Skip this step if we were not able to pick the execution (someone else got the lock) return CompletableFuture.completedFuture(null); - } else { - return CompletableFuture.supplyAsync(x::get); } - }, executor.getExecutorService()) - .thenComposeAsync(picked -> { // Experimental support for async execution. Peek at Task to see if support async // Unresolved tasks will be handled further in final Optional task = taskResolver.resolve(picked.taskInstance.getTaskName()); @@ -117,6 +118,7 @@ public void run() { executor.decrementInQueue(); newDueBatch.oneExecutionDone(triggerCheckForNewExecutions::run); }); + executor.addOngoingWork(future); } statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java index 9450a0a8..039f6864 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java @@ -92,7 +92,7 @@ public void run() { } for (Execution picked : pickedExecutions) { - CompletableFuture + CompletableFuture future = CompletableFuture .runAsync(executor::incrementInQueue, executor.getExecutorService()) .thenComposeAsync((_ignored) -> { // Experimental support for async execution. Peek at Task to see if support async @@ -118,6 +118,8 @@ public void run() { triggerCheckForNewExecutions.run(); } }); + + executor.addOngoingWork(future); } statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index d8eeb2c7..7ab63423 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -324,4 +324,7 @@ public static SchedulerBuilder create(DataSource dataSource, List> known return new SchedulerBuilder(dataSource, knownTasks); } + protected void awaitCurrentlyExecuting() { + executor.awaitCurrentlyExecuting(); + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java index d662622c..7007f408 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java @@ -54,6 +54,7 @@ public void setTime(Instant newtime) { public void runAnyDueExecutions() { super.executeDueStrategy.run(); + super.awaitCurrentlyExecuting(); } public void runDeadExecutionDetection() { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java index 1abad063..168725c4 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java @@ -76,6 +76,7 @@ public ManualScheduler build() { final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer, clock); final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer, clock); + // TODO: new DirectExecutorService() will not work after starting to execute work async return new ManualScheduler(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, executorThreads, new DirectExecutorService(), schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, Optional.ofNullable(pollingStrategyConfig).orElse(PollingStrategyConfig.DEFAULT_FETCH), diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java index b0d7c2d8..e0c24150 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java @@ -5,6 +5,7 @@ import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.task.*; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; +import com.github.kagkarlsson.scheduler.testhelper.ManualScheduler; import com.github.kagkarlsson.scheduler.testhelper.SettableClock; import com.google.common.util.concurrent.MoreExecutors; import org.hamcrest.Matchers; @@ -97,33 +98,6 @@ public void scheduler_should_handle_dead_executions() { assertThat(jdbcTaskRepository.getDue(Instant.now(), POLLING_LIMIT), hasSize(1)); } - @Test - public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat() { - final Instant now = Instant.now(); - settableClock.set(now.minus(Duration.ofHours(1))); - final Instant oneHourAgo = settableClock.now(); - - final TaskInstance taskInstance = nonCompleting.instance("id1"); - final SchedulableTaskInstance execution1 = new SchedulableTaskInstance<>(taskInstance, oneHourAgo); - jdbcTaskRepository.createIfNotExists(execution1); - - scheduler.executeDue(); - assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(1)); - - scheduler.executeDue(); - assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(1)); - - settableClock.set(Instant.now()); - - scheduler.detectDeadExecutions(); - assertThat(deadExecutionHandler.timesCalled, is(1)); - - settableClock.set(Instant.now()); - - scheduler.executeDue(); - assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(2)); - } - public static class NonCompletingTask extends OneTimeTask { private final VoidExecutionHandler handler; diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java index 88c57aa7..9f1b1130 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java @@ -125,14 +125,14 @@ public void scheduler_should_execute_rescheduled_task_when_exactly_due() { @Test public void scheduler_should_not_execute_canceled_tasks() { OneTimeTask oneTimeTask = TestTasks.oneTime("OneTime", Void.class, handler); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); Instant executionTime = clock.now().plus(ofMinutes(1)); String instanceId = "1"; TaskInstance oneTimeTaskInstance = oneTimeTask.instance(instanceId); scheduler.schedule(oneTimeTaskInstance, executionTime); scheduler.cancel(oneTimeTaskInstance); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); assertThat(handler.timesExecuted.get(), is(0)); clock.set(executionTime); @@ -143,16 +143,16 @@ public void scheduler_should_not_execute_canceled_tasks() { @Test public void scheduler_should_execute_recurring_task_and_reschedule() { RecurringTask recurringTask = TestTasks.recurring("Recurring", FixedDelay.of(ofHours(1)), handler); - Scheduler scheduler = schedulerFor(recurringTask); + ManualScheduler scheduler = schedulerFor(recurringTask); scheduler.schedule(recurringTask.instance("single"), clock.now()); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); assertThat(handler.timesExecuted.get(), is(1)); Instant nextExecutionTime = clock.now().plus(ofHours(1)); clock.set(nextExecutionTime); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); assertThat(handler.timesExecuted.get(), is(2)); } @@ -183,10 +183,10 @@ public void should_expose_cause_of_failure_to_completion_handler() throws Interr Task oneTimeTask = ComposableTask.customTask("cause-testing-task", Void.class, TestTasks.REMOVE_ON_COMPLETE, failureHandler, (inst, ctx) -> { throw new RuntimeException("Failed!");}); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); scheduler.schedule(oneTimeTask.instance("1"), clock.now()); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); // failureHandler.waitForNotify.await(); assertThat(failureHandler.result, is(ExecutionComplete.Result.FAILED)); @@ -206,15 +206,15 @@ public void should_only_attempt_task_when_max_retries_handler_used() throws Inte throw new RuntimeException("Failed!"); }); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); scheduler.schedule(oneTimeTask.instance("1"), clock.now()); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); //Simulate 15 minutes worth of time to validate we did not process more than we should for( int minuteWorthOfTime = 1; minuteWorthOfTime <= 15; minuteWorthOfTime ++) { clock.set(clock.now().plus(ofMinutes(1))); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); } //will always be maxRetries + 1 due to the first call always being required. @@ -235,16 +235,16 @@ public void should_reschedule_failure_on_exponential_backoff_with_default_rate() } }); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); Instant firstExecution = clock.now(); scheduler.schedule(oneTimeTask.instance("1"), firstExecution); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); //Simulate 30 minutes worth of time to validate we did not process more than we should for( int minuteWorthOfTime = 1; minuteWorthOfTime <= 30; minuteWorthOfTime ++) { clock.set(clock.now().plus(ofMinutes(1))); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); } assertThat(executionTimes.size(), is(10)); @@ -275,16 +275,16 @@ public void should_reschedule_failure_on_exponential_backoff_with_defined_rate() } }); - Scheduler scheduler = schedulerFor(oneTimeTask); + ManualScheduler scheduler = schedulerFor(oneTimeTask); Instant firstExecution = clock.now(); scheduler.schedule(oneTimeTask.instance("1"), firstExecution); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); //Simulate 30 minutes worth of time to validate we did not process more than we should for( int minuteWorthOfTime = 1; minuteWorthOfTime <= 30; minuteWorthOfTime ++) { clock.set(clock.now().plus(ofMinutes(1))); - scheduler.executeDue(); + scheduler.runAnyDueExecutions(); } assertThat(executionTimes.size(), is(10)); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ExecutorPoolTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ExecutorPoolTest.java index 0c7de39c..c991011a 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ExecutorPoolTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ExecutorPoolTest.java @@ -72,7 +72,6 @@ public void test_execute_until_none_left_high_volume() { testExecuteUntilNoneLeft(12, 4, 200); } - private void testExecuteUntilNoneLeft(int pollingLimit, int threads, int executionsToRun) { Instant now = Instant.now(); OneTimeTask task = TestTasks.oneTime("onetime-a", Void.class, TestTasks.DO_NOTHING);