Skip to content

Commit

Permalink
Configurable heartbeat-limit (before considered dead)
Browse files Browse the repository at this point in the history
  • Loading branch information
kagkarlsson committed Dec 6, 2023
1 parent 5faf59c commit abf46e6
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ public HeartbeatState(Clock clock, Instant startTime, HeartbeatConfig heartbeatC

public boolean hasStaleHeartbeat() {
Duration sinceLastSuccess = Duration.between(heartbeatLastSuccess, clock.now());

long heartbeatMillis = heartbeatConfig.heartbeatInterval.toMillis();
long millisUntilConsideredStale = heartbeatMillis + Math.min(10_000, (int)(heartbeatMillis * 0.25));
return heartbeatFailuresSinceLastSuccess > 0
|| sinceLastSuccess.toMillis() > heartbeatConfig.heartbeatInterval.toMillis();
|| sinceLastSuccess.toMillis() > millisUntilConsideredStale;
}

public double getFractionDead() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class Scheduler implements SchedulerClient {
public static final double TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO = 0.5;
public static final String THREAD_PREFIX = "db-scheduler";
private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
public static final int MISSED_HEARTBEATS_LIMIT = 4;
private final SchedulerClient delegate;
final Clock clock;
final TaskRepository schedulerTaskRepository;
Expand All @@ -55,6 +54,7 @@ public class Scheduler implements SchedulerClient {
protected final Executor executor;
private final ScheduledExecutorService housekeeperExecutor;
private final HeartbeatConfig heartbeatConfig;
private final int numberOfMissedHeartbeatsBeforeDead;
int threadpoolSize;
private final Waiter executeDueWaiter;
private final Duration deleteUnresolvedAfter;
Expand All @@ -79,6 +79,7 @@ protected Scheduler(
SchedulerName schedulerName,
Waiter executeDueWaiter,
Duration heartbeatInterval,
int numberOfMissedHeartbeatsBeforeDead,
boolean enableImmediateExecution,
StatsRegistry statsRegistry,
PollingStrategyConfig pollingStrategyConfig,
Expand All @@ -100,10 +101,11 @@ protected Scheduler(
this.onStartup = onStartup;
this.detectDeadWaiter = new Waiter(heartbeatInterval.multipliedBy(2), clock);
this.heartbeatInterval = heartbeatInterval;
this.numberOfMissedHeartbeatsBeforeDead = numberOfMissedHeartbeatsBeforeDead;
this.heartbeatWaiter = new Waiter(heartbeatInterval, clock);
this.heartbeatConfig =
new HeartbeatConfig(
heartbeatInterval, MISSED_HEARTBEATS_LIMIT, getMaxAgeBeforeConsideredDead());
heartbeatInterval, numberOfMissedHeartbeatsBeforeDead, getMaxAgeBeforeConsideredDead());
this.statsRegistry = statsRegistry;
this.dueExecutor = dueExecutor;
this.housekeeperExecutor = housekeeperExecutor;
Expand Down Expand Up @@ -425,7 +427,7 @@ protected void updateHeartbeatForExecution(Instant now, CurrentlyExecuting curre
}

Duration getMaxAgeBeforeConsideredDead() {
return heartbeatInterval.multipliedBy(MISSED_HEARTBEATS_LIMIT);
return heartbeatInterval.multipliedBy(numberOfMissedHeartbeatsBeforeDead);
}

public static SchedulerBuilder create(DataSource dataSource, Task<?>... knownTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class SchedulerBuilder {

public static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(10);
public static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMinutes(5);
public static final int DEFAULT_MISSED_HEARTBEATS_LIMIT = 6;
public static final Duration DEFAULT_DELETION_OF_UNRESOLVED_TASKS_DURATION = Duration.ofDays(14);
public static final Duration SHUTDOWN_MAX_WAIT = Duration.ofMinutes(30);
public static final PollingStrategyConfig DEFAULT_POLLING_STRATEGY =
Expand Down Expand Up @@ -74,6 +75,7 @@ public class SchedulerBuilder {
protected LogLevel logLevel = DEFAULT_FAILURE_LOG_LEVEL;
protected boolean logStackTrace = LOG_STACK_TRACE_ON_FAILURE;
private boolean registerShutdownHook = false;
private int numberOfMissedHeartbeatsBeforeDead = DEFAULT_MISSED_HEARTBEATS_LIMIT;

public SchedulerBuilder(DataSource dataSource, List<Task<?>> knownTasks) {
this.dataSource = dataSource;
Expand Down Expand Up @@ -101,6 +103,14 @@ public SchedulerBuilder heartbeatInterval(Duration duration) {
return this;
}

public SchedulerBuilder missedHeartbeatLimit(int numberOfMissedHeartbeatsBeforeDead) {
if (numberOfMissedHeartbeatsBeforeDead <= 4) {
throw new IllegalArgumentException("Heartbeat-limit must be at least 4");
}
this.numberOfMissedHeartbeatsBeforeDead = numberOfMissedHeartbeatsBeforeDead;
return this;
}

public SchedulerBuilder threads(int numberOfThreads) {
this.executorThreads = numberOfThreads;
return this;
Expand Down Expand Up @@ -271,6 +281,7 @@ public Scheduler build() {
schedulerName,
waiter,
heartbeatInterval,
numberOfMissedHeartbeatsBeforeDead,
enableImmediateExecution,
statsRegistry,
pollingStrategyConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class ManualScheduler extends Scheduler {
schedulerName,
waiter,
heartbeatInterval,
SchedulerBuilder.DEFAULT_MISSED_HEARTBEATS_LIMIT,
executeImmediately,
statsRegistry,
pollingStrategyConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ void success_resets_failing() {
assertOk(state);
}

@Test
void not_stale_until_tolerance_passed() {
HeartbeatState state =
new HeartbeatState(
clock,
clock.now(),
new HeartbeatConfig(Duration.ofSeconds(60), 4, Duration.ofMinutes(4)));

assertOk(state);

clock.tick(Duration.ofSeconds(60));
assertThat(state.hasStaleHeartbeat(), is(false));

clock.tick(Duration.ofSeconds(5));
assertThat(state.hasStaleHeartbeat(), is(false));

clock.tick(Duration.ofSeconds(25));
assertThat(state.hasStaleHeartbeat(), is(true));

state.heartbeat(true, clock.now());
assertThat(state.hasStaleHeartbeat(), is(false));

assertOk(state);
}

private void assertFailing(HeartbeatState state, int timesFailed, double fractionDead) {
assertTrue(state.hasStaleHeartbeat());
assertThat(state.getFailedHeartbeats(), is(timesFailed));
Expand Down

0 comments on commit abf46e6

Please sign in to comment.