Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the ability to trigger task run manually. #184

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,39 @@
*/
package com.github.kagkarlsson.scheduler;

import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;


class DueExecutionsBatch {

private static final Logger LOG = LoggerFactory.getLogger(DueExecutionsBatch.class);
private final int generationNumber;
private final AtomicInteger executionsLeftInBatch;
private final CompletableFuture<BatchCompletionResult> onBatchComplete;
private int threadpoolSize;
private boolean possiblyMoreExecutionsInDb;
private boolean stale = false;
private boolean triggeredExecuteDue;

public DueExecutionsBatch(int threadpoolSize, int generationNumber, int executionsAdded, boolean possiblyMoreExecutionsInDb) {
public DueExecutionsBatch(int threadpoolSize, int generationNumber, int executionsAdded,
boolean possiblyMoreExecutionsInDb) {
this(threadpoolSize, generationNumber, executionsAdded, possiblyMoreExecutionsInDb, new CompletableFuture<>());
}

public DueExecutionsBatch(int threadpoolSize, int generationNumber, int executionsAdded,
boolean possiblyMoreExecutionsInDb, CompletableFuture<BatchCompletionResult> onBatchComplete) {
this.threadpoolSize = threadpoolSize;
this.generationNumber = generationNumber;
this.possiblyMoreExecutionsInDb = possiblyMoreExecutionsInDb;
this.executionsLeftInBatch = new AtomicInteger(executionsAdded);
this.onBatchComplete = onBatchComplete;
if (this.executionsLeftInBatch.get() == 0) {
this.onBatchComplete.complete(new BatchCompletionResult(false));
}
}

public void markBatchAsStale() {
Expand All @@ -50,16 +62,20 @@ public void oneExecutionDone(Runnable triggerCheckForNewBatch) {
// May be called concurrently by multiple threads
executionsLeftInBatch.decrementAndGet();

LOG.trace("Batch state: generationNumber:{}, stale:{}, triggeredExecuteDue:{}, possiblyMoreExecutionsInDb:{}, executionsLeftInBatch:{}, ratio-trigger:{}",
generationNumber, stale, triggeredExecuteDue, possiblyMoreExecutionsInDb, executionsLeftInBatch.get(), (threadpoolSize * Scheduler.TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO));
if (!stale
&& !triggeredExecuteDue
&& possiblyMoreExecutionsInDb
&& executionsLeftInBatch.get() <= (threadpoolSize * Scheduler.TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO)) {
LOG.trace(
"Batch state: generationNumber:{}, stale:{}, triggeredExecuteDue:{}, possiblyMoreExecutionsInDb:{}, executionsLeftInBatch:{}, ratio-trigger:{}",
generationNumber, stale, triggeredExecuteDue, possiblyMoreExecutionsInDb, executionsLeftInBatch.get(),
(threadpoolSize * Scheduler.TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO));
if (!stale && !triggeredExecuteDue && possiblyMoreExecutionsInDb && executionsLeftInBatch.get() <= (
threadpoolSize * Scheduler.TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO)) {
LOG.trace("Triggering check for new batch.");
triggerCheckForNewBatch.run();
triggeredExecuteDue = true;
}

if (executionsLeftInBatch.get() == 0) {
onBatchComplete.complete(new BatchCompletionResult(possiblyMoreExecutionsInDb));
}
}

public boolean isOlderGenerationThan(int compareTo) {
Expand All @@ -69,4 +85,16 @@ public boolean isOlderGenerationThan(int compareTo) {
public int getGenerationNumber() {
return generationNumber;
}

public static class BatchCompletionResult {
private final boolean possiblyMoreExecutionsInDb;

BatchCompletionResult(boolean possiblyMoreExecutionsInDb) {
this.possiblyMoreExecutionsInDb = possiblyMoreExecutionsInDb;
}

public boolean possiblyMoreExecutionsInDb() {
return possiblyMoreExecutionsInDb;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry.SchedulerStatsEvent;
import com.github.kagkarlsson.scheduler.task.*;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,6 +46,7 @@ public class Scheduler implements SchedulerClient {
private final Clock clock;
private final TaskRepository schedulerTaskRepository;
private final TaskResolver taskResolver;
private final boolean startConsumingTasksOnStart;
private int threadpoolSize;
private final ExecutorService executorService;
private final Waiter executeDueWaiter;
Expand All @@ -64,9 +66,10 @@ public class Scheduler implements SchedulerClient {
private int currentGenerationNumber = 1;
private final ConfigurableLogger failureLogger;

protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, ExecutorService executorService, SchedulerName schedulerName,
Waiter executeDueWaiter, Duration heartbeatInterval, boolean enableImmediateExecution, StatsRegistry statsRegistry, int pollingLimit, Duration deleteUnresolvedAfter, Duration shutdownMaxWait,
LogLevel logLevel, boolean logStackTrace, List<OnStartup> onStartup) {
protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository,
TaskResolver taskResolver, int threadpoolSize, ExecutorService executorService, SchedulerName schedulerName,
Waiter executeDueWaiter, Duration heartbeatInterval, boolean enableImmediateExecution, StatsRegistry statsRegistry, int pollingLimit, Duration deleteUnresolvedAfter, Duration shutdownMaxWait,
LogLevel logLevel, boolean logStackTrace, List<OnStartup> onStartup, boolean startConsumingTasksOnStart) {
this.clock = clock;
this.schedulerTaskRepository = schedulerTaskRepository;
this.taskResolver = taskResolver;
Expand All @@ -87,20 +90,36 @@ protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRep
SchedulerClientEventListener earlyExecutionListener = (enableImmediateExecution ? new TriggerCheckForDueExecutions(schedulerState, clock, executeDueWaiter) : SchedulerClientEventListener.NOOP);
delegate = new StandardSchedulerClient(clientTaskRepository, earlyExecutionListener);
this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);
this.startConsumingTasksOnStart = startConsumingTasksOnStart;
}

public void start() {
/**
* Starts consuming the tasks from the queue.
* If the scheduler is not run with {@link #startConsumer()}, can still be used in `producer-only` mode.
*/
public void startConsumer() {
LOG.info("Starting scheduler.");

executeOnStartup();

dueExecutor.submit(new RunUntilShutdown(this::executeDue, executeDueWaiter, schedulerState, statsRegistry));
if(startConsumingTasksOnStart) {
dueExecutor.submit(
new RunUntilShutdown(this::executeDueTasks, executeDueWaiter, schedulerState, statsRegistry));
}
detectDeadExecutor.submit(new RunUntilShutdown(this::detectDeadExecutions, detectDeadWaiter, schedulerState, statsRegistry));
updateHeartbeatExecutor.submit(new RunUntilShutdown(this::updateHeartbeats, heartbeatWaiter, schedulerState, statsRegistry));

schedulerState.setStarted();
}

/**
* @deprecated use {@link #startConsumer()}
*/
@Deprecated
public void start() {
this.startConsumer();
}

protected void executeOnStartup() {
onStartup.forEach(os -> {
try {
Expand Down Expand Up @@ -214,18 +233,35 @@ public List<CurrentlyExecuting> getCurrentlyExecuting() {
return new ArrayList<>(currentlyProcessing.values());
}

/**
* @deprecated Use {@link #executeDueTasks()}
*/
@Deprecated
protected void executeDue() {
executeDueTasks();
}

public CompletableFuture<DueExecutionsBatch.BatchCompletionResult> executeDueTasks() {
Instant now = clock.now();
List<Execution> dueExecutions = schedulerTaskRepository.getDue(now, pollingLimit);
LOG.trace("Found {} task instances due for execution", dueExecutions.size());

boolean possiblyMoreExecutionsInDb = pollingLimit == dueExecutions.size();

CompletableFuture<DueExecutionsBatch.BatchCompletionResult> onBatchComplete =
new CompletableFuture<>();

this.currentGenerationNumber = this.currentGenerationNumber + 1;
DueExecutionsBatch newDueBatch = new DueExecutionsBatch(Scheduler.this.threadpoolSize, currentGenerationNumber, dueExecutions.size(), pollingLimit == dueExecutions.size());
DueExecutionsBatch newDueBatch =
new DueExecutionsBatch(Scheduler.this.threadpoolSize, currentGenerationNumber, dueExecutions.size(),
possiblyMoreExecutionsInDb, onBatchComplete);

for (Execution e : dueExecutions) {
executorService.execute(new PickAndExecute(e, newDueBatch));
}
statsRegistry.register(SchedulerStatsEvent.RAN_EXECUTE_DUE);

return onBatchComplete;
}

@SuppressWarnings({"rawtypes","unchecked"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class SchedulerBuilder {
protected boolean commitWhenAutocommitDisabled = false;
protected LogLevel logLevel = DEFAULT_FAILURE_LOG_LEVEL;
protected boolean logStackTrace = LOG_STACK_TRACE_ON_FAILURE;
private boolean startConsumingTasksOnStart = true;

public SchedulerBuilder(DataSource dataSource, List<Task<?>> knownTasks) {
this.dataSource = dataSource;
Expand Down Expand Up @@ -148,6 +149,11 @@ public SchedulerBuilder enableImmediateExecution() {
return this;
}

public SchedulerBuilder disableStartConsumingTasksOnStart(){
this.startConsumingTasksOnStart = false;
return this;
}

public SchedulerBuilder deleteUnresolvedAfter(Duration deleteAfter) {
this.deleteUnresolvedAfter = deleteAfter;
return this;
Expand Down Expand Up @@ -205,6 +211,6 @@ public Scheduler build() {
schedulerName.getName());
return new Scheduler(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, executorThreads, candidateExecutorService,
schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit,
deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks);
deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks, startConsumingTasksOnStart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ public class ManualScheduler extends Scheduler {
private static final Logger LOG = LoggerFactory.getLogger(ManualScheduler.class);
private final SettableClock clock;

ManualScheduler(SettableClock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository, TaskResolver taskResolver, int maxThreads, ExecutorService executorService, SchedulerName schedulerName, Waiter waiter, Duration heartbeatInterval, boolean executeImmediately, StatsRegistry statsRegistry, int pollingLimit, Duration deleteUnresolvedAfter, LogLevel logLevel, boolean logStackTrace, List<OnStartup> onStartup) {
super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, maxThreads, executorService, schedulerName, waiter, heartbeatInterval, executeImmediately, statsRegistry, pollingLimit, deleteUnresolvedAfter, Duration.ZERO, logLevel, logStackTrace, onStartup);
ManualScheduler(SettableClock clock, TaskRepository schedulerTaskRepository, TaskRepository clientTaskRepository,
TaskResolver taskResolver, int maxThreads, ExecutorService executorService, SchedulerName schedulerName, Waiter waiter, Duration heartbeatInterval, boolean executeImmediately, StatsRegistry statsRegistry,
int pollingLimit, Duration deleteUnresolvedAfter, LogLevel logLevel, boolean logStackTrace,
List<OnStartup> onStartup, boolean startConsumingTasksOnStart) {

super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, maxThreads, executorService, schedulerName, waiter, heartbeatInterval, executeImmediately, statsRegistry, pollingLimit, deleteUnresolvedAfter, Duration.ZERO, logLevel, logStackTrace, onStartup,
startConsumingTasksOnStart);
this.clock = clock;
}

Expand All @@ -49,15 +54,15 @@ public void setTime(Instant newtime) {
}

public void runAnyDueExecutions() {
super.executeDue();
super.executeDueTasks();
}

public void runDeadExecutionDetection() {
super.detectDeadExecutions();
}


public void start() {
public void startConsumer() {
LOG.info("Starting manual scheduler. Executing on-startup tasks.");
executeOnStartup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ public ManualScheduler build() {
final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer);
final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer);

return new ManualScheduler(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, executorThreads, new DirectExecutorService(), schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, deleteUnresolvedAfter, LogLevel.DEBUG, true, startTasks);
return new ManualScheduler(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, executorThreads, new DirectExecutorService(), schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, deleteUnresolvedAfter, LogLevel.DEBUG, true, startTasks, true);
}

public ManualScheduler start() {
ManualScheduler scheduler = build();
scheduler.start();
scheduler.startConsumer();
return scheduler;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public void test_concurrency() throws InterruptedException {
final Scheduler scheduler2 = createScheduler("scheduler2", task, stats);

stopScheduler.register(scheduler1, scheduler2);
scheduler1.start();
scheduler2.start();
scheduler1.startConsumer();
scheduler2.startConsumer();

ids.forEach(id -> {
scheduler1.schedule(task.instance(id), Instant.now());
Expand Down Expand Up @@ -88,8 +88,8 @@ public void test_concurrency_recurring() throws InterruptedException {
final Scheduler scheduler2 = createSchedulerRecurring("scheduler2", task1, stats);

stopScheduler.register(scheduler1, scheduler2);
scheduler1.start();
scheduler2.start();
scheduler1.startConsumer();
scheduler2.startConsumer();

Thread.sleep(5_000);
scheduler1.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void setUp() {
Duration.ZERO,
LogLevel.DEBUG,
true,
new ArrayList<>());
new ArrayList<>(),
true);

}

Expand Down Expand Up @@ -113,10 +114,10 @@ public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat(
final Execution execution1 = new Execution(oneHourAgo, taskInstance);
jdbcTaskRepository.createIfNotExists(execution1);

scheduler.executeDue();
scheduler.executeDueTasks();
assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(1));

scheduler.executeDue();
scheduler.executeDueTasks();
assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(1));

settableClock.set(Instant.now());
Expand All @@ -126,7 +127,7 @@ public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat(

settableClock.set(Instant.now());

scheduler.executeDue();
scheduler.executeDueTasks();
assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(2));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.github.kagkarlsson.scheduler;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -16,7 +18,7 @@ public class DueExecutionsBatchTest {


@Test
public void test_trigger_check_for_more_due() {
public void test_trigger_check_for_more_due() throws ExecutionException, InterruptedException {
assertTrigger(0, 0, newBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB));
assertTrigger(0, 10, newBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB));
assertTrigger(0, 14, newBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB));
Expand All @@ -27,7 +29,8 @@ public void test_trigger_check_for_more_due() {
assertTrigger(0, HAPPY_NUMBER_ADDED_LAST_TIME, newBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, false));

// Stale batch, Scheduler has already triggered a new executeDue
assertTrigger(0, HAPPY_NUMBER_ADDED_LAST_TIME, staleBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB));
assertTrigger(0, HAPPY_NUMBER_ADDED_LAST_TIME,
staleBatch(HAPPY_THREADPOOL_SIZE, HAPPY_NUMBER_ADDED_LAST_TIME, HAPPY_LIKELY_MORE_IN_DB));
}

private void assertTrigger(int timesTriggered, int afterExeutionsHandled, DueExecutionsBatch batch) {
Expand All @@ -36,15 +39,21 @@ private void assertTrigger(int timesTriggered, int afterExeutionsHandled, DueExe
assertEquals(timesTriggered, triggered.get());
}

private DueExecutionsBatch newBatch(int schedulerThreadpoolSize, int numberAddedFromLastDbQuery, boolean likelyMoreDueInDb) {
DueExecutionsBatch batch = new DueExecutionsBatch(schedulerThreadpoolSize, HAPPY_GENERATION_NUMBER, numberAddedFromLastDbQuery, likelyMoreDueInDb);
private DueExecutionsBatch newBatch(int schedulerThreadpoolSize, int numberAddedFromLastDbQuery,
boolean likelyMoreDueInDb) throws ExecutionException, InterruptedException {

CompletableFuture<DueExecutionsBatch.BatchCompletionResult> onBatchComplete = new CompletableFuture<>();
DueExecutionsBatch batch =
new DueExecutionsBatch(schedulerThreadpoolSize, HAPPY_GENERATION_NUMBER, numberAddedFromLastDbQuery,
likelyMoreDueInDb, onBatchComplete);
assertEquals(onBatchComplete.get().possiblyMoreExecutionsInDb(), likelyMoreDueInDb);
return batch;
}

private DueExecutionsBatch staleBatch(int schedulerThreadpoolSize, int numberAddedFromLastDbQuery, boolean likelyMoreDueInDb) {
private DueExecutionsBatch staleBatch(int schedulerThreadpoolSize, int numberAddedFromLastDbQuery,
boolean likelyMoreDueInDb) throws ExecutionException, InterruptedException {
DueExecutionsBatch batch = newBatch(schedulerThreadpoolSize, numberAddedFromLastDbQuery, likelyMoreDueInDb);
batch.markBatchAsStale();
return batch;
}

}
Loading