Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC for experimental async support #369

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright (C) Gustav Karlsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

@SuppressWarnings("rawtypes")
class AsyncExecutePicked {
private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutePicked.class);
private final Executor executor;
private final TaskRepository taskRepository;
private SchedulerClientEventListener earlyExecutionListener;
private final SchedulerClient schedulerClient;
private final StatsRegistry statsRegistry;
private final TaskResolver taskResolver;
private final SchedulerState schedulerState;
private final ConfigurableLogger failureLogger;
private final Clock clock;
private final Execution pickedExecution;

public AsyncExecutePicked(Executor executor, TaskRepository taskRepository, SchedulerClientEventListener earlyExecutionListener, SchedulerClient schedulerClient, StatsRegistry statsRegistry,
TaskResolver taskResolver, SchedulerState schedulerState, ConfigurableLogger failureLogger,
Clock clock, Execution pickedExecution) {
this.executor = executor;
this.taskRepository = taskRepository;
this.earlyExecutionListener = earlyExecutionListener;
this.schedulerClient = schedulerClient;
this.statsRegistry = statsRegistry;
this.taskResolver = taskResolver;
this.schedulerState = schedulerState;
this.failureLogger = failureLogger;
this.clock = clock;
this.pickedExecution = pickedExecution;
}

public CompletableFuture<Void> toCompletableFuture() {
// FIXLATER: need to cleanup all the references back to scheduler fields
final UUID executionId = executor.addCurrentlyProcessing(new CurrentlyExecuting(pickedExecution, clock));
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
return executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId));
}

private CompletableFuture<Void> executePickedExecution(Execution execution) {
final Optional<Task> task = taskResolver.resolve(execution.taskInstance.getTaskName());
if (!task.isPresent()) {
LOG.error("Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.", execution.taskInstance.getTaskName());
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
return new CompletableFuture<>();
}
if (!(task.get() instanceof AsyncExecutionHandler)) {
throw new IllegalStateException("Should only ever try to execute async when task has an AsyncExecutionHandler");
}

AsyncExecutionHandler asyncHandler = (AsyncExecutionHandler) task.get();
Instant executionStarted = clock.now();
LOG.debug("Executing " + execution);
CompletableFuture<CompletionHandler> completableFuture = asyncHandler.executeAsync(execution.taskInstance, new AsyncExecutionContext(schedulerState, execution, schedulerClient, executor.getExecutorService()));

return completableFuture.handle((completion, ex) -> {
if (ex != null) {
if (ex instanceof RuntimeException) {
failure(task.get(), execution, ex, executionStarted, "Unhandled exception");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
} else {
failure(task.get(), execution, ex, executionStarted, "Error");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
}
return null;
}
LOG.debug("Execution done");
complete(completion, execution, executionStarted);
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED);
return null;

});
}

private void complete(CompletionHandler completion, Execution execution, Instant executionStarted) {
ExecutionComplete completeEvent = ExecutionComplete.success(execution, executionStarted, clock.now());
try {
completion.complete(completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. " +
"The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, e);
}
}

private void failure(Task task, Execution execution, Throwable cause, Instant executionStarted, String errorMessagePrefix) {
String logMessage = errorMessagePrefix + " during execution of task with name '{}'. Treating as failure.";
failureLogger.log(logMessage, cause, task.getName());

ExecutionComplete completeEvent = ExecutionComplete.failure(execution, executionStarted, clock.now(), cause);
try {
task.getFailureHandler().onFailure(completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. " +
"The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,73 @@

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;

// TODO: make super-class of the polling-strategies instead
public class Executor {
private static final Logger LOG = LoggerFactory.getLogger(Executor.class);

final Map<UUID, CurrentlyExecuting> currentlyProcessing = Collections.synchronizedMap(new HashMap<>());
private AtomicInteger currentlyInQueueOrProcessing = new AtomicInteger(0);
private final ExecutorService executorService;
private final Clock clock;
private Map<CompletableFuture<Void>, CompletableFuture<Void>> ongoingWork = new ConcurrentHashMap<>();

public Executor(ExecutorService executorService, Clock clock) {
this.executorService = executorService;
this.clock = clock;
}

public void addToQueue(Runnable r, Runnable afterDone) {
// TODO: remove
public void addToQueue(Supplier<CompletableFuture<Void>> toRun, Runnable afterDone) {
currentlyInQueueOrProcessing.incrementAndGet(); // if we always had a ThreadPoolExecutor we could check queue-size using getQueue()
executorService.execute(() -> {
// Execute
try {
r.run();
} finally {
currentlyInQueueOrProcessing.decrementAndGet();
// Run callbacks after decrementing currentlyInQueueOrProcessing
afterDone.run();
}

toRun.get().thenAccept(v -> {
// For async, these callbacks are run before complete,
// thus allowing queue of ongoing executions to grow without bound
currentlyInQueueOrProcessing.decrementAndGet();
// Run callbacks after decrementing currentlyInQueueOrProcessing
afterDone.run();
});

}

public void incrementInQueue() {
currentlyInQueueOrProcessing.incrementAndGet(); // if we always had a ThreadPoolExecutor we could check queue-size using getQueue()
}
public void decrementInQueue() {
currentlyInQueueOrProcessing.decrementAndGet();
}

public void addOngoingWork(CompletableFuture<Void> work) {
ongoingWork.put(work, work);
}

public List<CurrentlyExecuting> getCurrentlyExecuting() {
return new ArrayList<>(currentlyProcessing.values());
}

@SuppressWarnings("rawtypes")
public void awaitCurrentlyExecuting() {
CompletableFuture[] ongoingWork = this.ongoingWork.keySet().toArray(new CompletableFuture[0]);
CompletableFuture.allOf(ongoingWork).join();
}

public void stop(Duration shutdownMaxWait) {

LOG.info("Letting running executions finish. Will wait up to 2x{}.", shutdownMaxWait);
// TODO: upper timelimit for completable futures as well
// Wait for futures explicitly, as we can no longer rely on executorService.shutdown()
awaitCurrentlyExecuting();

final Instant startShutdown = clock.now();

if (ExecutorUtils.shutdownAndAwaitTermination(executorService, shutdownMaxWait, shutdownMaxWait)) {
LOG.info("Scheduler stopped.");
} else {
Expand Down Expand Up @@ -99,4 +122,8 @@ public void removeCurrentlyProcessing(UUID executionId) {
LOG.warn("Released execution was not found in collection of executions currently being processed. Should never happen. Execution-id: " + executionId);
}
}

public java.util.concurrent.Executor getExecutorService() {
return executorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.AsyncExecutionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

public class FetchCandidates implements PollStrategy {
Expand Down Expand Up @@ -64,6 +68,7 @@ public FetchCandidates(Executor executor, TaskRepository taskRepository, Schedul
upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize);
}

@SuppressWarnings("rawtypes")
@Override
public void run() {
Instant now = clock.now();
Expand All @@ -81,16 +86,40 @@ public void run() {
(Integer leftInBatch) -> leftInBatch <= lowerLimit);

for (Execution e : fetchedDueExecutions) {
executor.addToQueue(
() -> {
final Optional<Execution> candidate = new PickDue(e, newDueBatch).call();
candidate.ifPresent(picked -> new ExecutePicked(executor, taskRepository, earlyExecutionListener, schedulerClient, statsRegistry,
taskResolver, schedulerState, failureLogger,
clock, picked).run());
},
() -> {

CompletableFuture<Void> future = CompletableFuture
.runAsync(executor::incrementInQueue, executor.getExecutorService())
.thenComposeAsync((result) -> CompletableFuture.supplyAsync(() -> {
Optional<Execution> candidate = new PickDue(e, newDueBatch).call();
return candidate.orElse(null); // TODO: remove optional before merge
}, executor.getExecutorService()))
.thenComposeAsync(picked -> {
if (picked == null) {
// Skip this step if we were not able to pick the execution (someone else got the lock)
return CompletableFuture.completedFuture(null);
}
// Experimental support for async execution. Peek at Task to see if support async
// Unresolved tasks will be handled further in
final Optional<Task> task = taskResolver.resolve(picked.taskInstance.getTaskName());
if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) {

return new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener,
schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger,
clock, picked).toCompletableFuture();
} else {

return CompletableFuture.runAsync(new ExecutePicked(executor, taskRepository, earlyExecutionListener,
schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger,
clock, picked), executor.getExecutorService());
}

}, executor.getExecutorService())
.thenAccept(x -> {
executor.decrementInQueue();
newDueBatch.oneExecutionDone(triggerCheckForNewExecutions::run);
});
executor.addOngoingWork(future);

}
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.AsyncExecutionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

public class LockAndFetchCandidates implements PollStrategy {
Expand Down Expand Up @@ -88,16 +92,34 @@ public void run() {
}

for (Execution picked : pickedExecutions) {
executor.addToQueue(
new ExecutePicked(executor, taskRepository, earlyExecutionListener, schedulerClient, statsRegistry,
taskResolver, schedulerState, failureLogger,
clock, picked),
() -> {
CompletableFuture<Void> future = CompletableFuture
.runAsync(executor::incrementInQueue, executor.getExecutorService())
.thenComposeAsync((_ignored) -> {
// Experimental support for async execution. Peek at Task to see if support async
// Unresolved tasks will be handled further in
final Optional<Task> task = taskResolver.resolve(picked.taskInstance.getTaskName());
if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) {

return new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener,
schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger,
clock, picked).toCompletableFuture();
} else {

return CompletableFuture.runAsync(new ExecutePicked(executor, taskRepository, earlyExecutionListener,
schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger,
clock, picked), executor.getExecutorService());
}

}, executor.getExecutorService())
.thenAccept(x -> {
executor.decrementInQueue();
if (moreExecutionsInDatabase.get()
&& executor.getNumberInQueueOrProcessing() <= lowerLimit) {
triggerCheckForNewExecutions.run();
}
});

executor.addOngoingWork(future);
}
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,7 @@ public static SchedulerBuilder create(DataSource dataSource, List<Task<?>> known
return new SchedulerBuilder(dataSource, knownTasks);
}

protected void awaitCurrentlyExecuting() {
executor.awaitCurrentlyExecuting();
}
}
Loading