Skip to content

Commit

Permalink
feat(core): dymamic timeout and delay in the Pause task
Browse files Browse the repository at this point in the history
Fixes #5338
  • Loading branch information
loicmathieu committed Nov 29, 2024
1 parent a8577ab commit 06474c0
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class Property<T> {
// By default, durations are stored as numbers.
// We cannot change that globally, as in JDBC/Elastic 'execution.state.duration' must be a number to be able to aggregate them.
// So we only change it here.
// So we only change it here to be used for Property.of().
private static final ObjectMapper MAPPER = JacksonMapper.ofJson()
.copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
Expand Down Expand Up @@ -91,6 +91,7 @@ public static <V> Property<V> of(V value) {
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = runContext.render(property.expression);
// special case for duration as they should be serialized as double but are not always
property.value = MAPPER.convertValue(rendered, clazz);
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/kestra/core/models/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.core.flow.WorkingDirectory;
Expand Down Expand Up @@ -35,7 +36,7 @@ abstract public class Task implements TaskInterface {
@Valid
protected AbstractRetry retry;

protected Duration timeout;
protected Property<Duration> timeout;

@Builder.Default
protected Boolean disabled = false;
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.event.Level;

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -618,13 +619,18 @@ private Executor handlePausedDelay(Executor executor, List<WorkerTaskResult> wor

if (task instanceof Pause pauseTask) {
if (pauseTask.getDelay() != null || pauseTask.getTimeout() != null) {
return ExecutionDelay.builder()
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(pauseTask.getDelay() != null ? pauseTask.getDelay() : pauseTask.getTimeout()))
.state(pauseTask.getDelay() != null ? State.Type.RUNNING : State.Type.FAILED)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
Duration delay = runContext.render(pauseTask.getDelay()).as(Duration.class).orElse(null);
Duration timeout = runContext.render(pauseTask.getTimeout()).as(Duration.class).orElse(null);
if (delay != null || timeout != null) { // rendering can lead to null, so we must re-check here
return ExecutionDelay.builder()
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(delay != null ? delay : timeout))
.state(delay != null ? State.Type.RUNNING : State.Type.FAILED)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
}
}
}

Expand Down
13 changes: 5 additions & 8 deletions core/src/main/java/io/kestra/core/runners/FlowInputOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import jakarta.validation.constraints.NotNull;

import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -73,10 +71,10 @@
*/
@Singleton
public class FlowInputOutput {
private static final Logger log = LoggerFactory.getLogger(FlowInputOutput.class);
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private static final ObjectMapper JSON_MAPPER = JacksonMapper.ofJson();

public static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
public static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
Expand Down Expand Up @@ -358,16 +356,15 @@ public Map<String, Object> typedOutputs(
if (flow.getOutputs() == null) {
return ImmutableMap.of();
}
final ObjectMapper mapper = new ObjectMapper();
Map<String, Object> results = flow
.getOutputs()
.stream()
.map(output -> {
final HashMap<String, Object> current;
final Object currentValue;
try {
current = in == null ? null : mapper.readValue(
mapper.writeValueAsString(in.get(output.getId())), new TypeReference<>() {});
current = in == null ? null : JSON_MAPPER.readValue(
JSON_MAPPER.writeValueAsString(in.get(output.getId())), new TypeReference<>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected void kill(final boolean markAsKilled) {

@Override
public State.Type doCall() throws Exception {
final Duration workerTaskTimeout = workerTask.getTask().getTimeout();
final Duration workerTaskTimeout = workerTask.getRunContext().render(workerTask.getTask().getTimeout()).as(Duration.class).orElse(null);

try {
if (workerTaskTimeout != null) {
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/io/kestra/plugin/core/flow/Pause.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.GraphTask;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
Expand Down Expand Up @@ -135,15 +136,13 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
title = "Duration of the pause — useful if you want to pause the execution for a fixed amount of time.",
description = "The delay is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API."
)
@PluginProperty
private Duration delay;
private Property<Duration> delay;

@Schema(
title = "Timeout of the pause — useful to avoid never-ending workflows in a human-in-the-loop scenario. For example, if you want to pause the execution until a human validates some data generated in a previous task, you can set a timeout of e.g. 24 hours. If no manual approval happens within 24 hours, the execution will automatically resume without a prior data validation.",
description = "If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API."
)
@PluginProperty
private Duration timeout;
private Property<Duration> timeout;

@Valid
@Schema(
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/io/kestra/core/runners/WorkerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
Expand Down Expand Up @@ -97,7 +98,7 @@ void failOnWorkerTaskWithFlowable() throws TimeoutException, QueueException, Jso

Pause pause = Pause.builder()
.type(Pause.class.getName())
.delay(Duration.ofSeconds(1))
.delay(Property.of(Duration.ofSeconds(1)))
.id("unit-test")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.Constant;
import io.kestra.core.models.validations.ModelValidator;
Expand All @@ -32,7 +33,7 @@

@KestraTest
class YamlParserTest {
private static ObjectMapper mapper = JacksonMapper.ofJson();
private static final ObjectMapper MAPPER = JacksonMapper.ofJson();

@Inject
private YamlParser yamlParser;
Expand All @@ -49,7 +50,7 @@ void parse() {

// third with all optionals
Task optionals = flow.getTasks().get(2);
assertThat(optionals.getTimeout(), is(Duration.ofMinutes(60)));
assertThat(optionals.getTimeout(), is(Property.builder().expression("PT60M").build()));
assertThat(optionals.getRetry().getType(), is("constant"));
assertThat(optionals.getRetry().getMaxAttempt(), is(5));
assertThat(((Constant) optionals.getRetry()).getInterval().getSeconds(), is(900L));
Expand All @@ -65,7 +66,7 @@ void parseString() throws IOException {

// third with all optionals
Task optionals = flow.getTasks().get(2);
assertThat(optionals.getTimeout(), is(Duration.ofMinutes(60)));
assertThat(optionals.getTimeout(), is(Property.builder().expression("PT60M").build()));
assertThat(optionals.getRetry().getType(), is("constant"));
assertThat(optionals.getRetry().getMaxAttempt(), is(5));
assertThat(((Constant) optionals.getRetry()).getInterval().getSeconds(), is(900L));
Expand Down Expand Up @@ -166,15 +167,15 @@ void listeners() {
void serialization() throws IOException {
Flow flow = this.parse("flows/valids/minimal.yaml");

String s = mapper.writeValueAsString(flow);
String s = MAPPER.writeValueAsString(flow);
assertThat(s, is("{\"id\":\"minimal\",\"namespace\":\"io.kestra.tests\",\"revision\":2,\"disabled\":false,\"deleted\":false,\"labels\":[{\"key\":\"system.readOnly\",\"value\":\"true\"}],\"tasks\":[{\"id\":\"date\",\"type\":\"io.kestra.plugin.core.debug.Return\",\"format\":\"{{taskrun.startDate}}\"}]}"));
}

@Test
void noDefault() throws IOException {
Flow flow = this.parse("flows/valids/parallel.yaml");

String s = mapper.writeValueAsString(flow);
String s = MAPPER.writeValueAsString(flow);
assertThat(s, not(containsString("\"-c\"")));
assertThat(s, containsString("\"deleted\":false"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.validations;

import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.plugin.core.flow.Pause;
Expand Down Expand Up @@ -57,7 +58,7 @@ void workingDirectoryInvalid() {
List.of(Pause.builder()
.id("pause")
.type(Pause.class.getName())
.delay(Duration.ofSeconds(1L))
.delay(Property.of(Duration.ofSeconds(1L)))
.build()
)
)
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/java/io/kestra/plugin/core/flow/PauseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ void delay() throws Exception {
suite.runDelay(runnerUtils);
}

@Disabled("This test is too flaky and it always pass in JDBC and Kafka")
void delayFromInput() throws Exception {
suite.runDelayFromInput(runnerUtils);
}

@Disabled("This test is too flaky and it always pass in JDBC and Kafka")
void parallelDelay() throws Exception {
suite.runParallelDelay(runnerUtils);
Expand Down Expand Up @@ -144,6 +149,24 @@ public void runDelay(RunnerUtils runnerUtils) throws Exception {
assertThat(execution.getTaskRunList(), hasSize(3));
}

public void runDelayFromInput(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(null, "io.kestra.tests", "pause-delay-from-input", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();

assertThat(execution.getState().getCurrent(), is(State.Type.PAUSED));
assertThat(execution.getTaskRunList(), hasSize(1));

execution = runnerUtils.awaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(5)
);

assertThat(execution.getTaskRunList().getFirst().getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().getFirst().getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(2L));
assertThat(execution.getTaskRunList(), hasSize(3));
}

public void runParallelDelay(RunnerUtils runnerUtils) throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "each-parallel-pause", Duration.ofSeconds(30));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
Expand Down Expand Up @@ -51,7 +52,7 @@ void timeout() throws TimeoutException, QueueException {
.id("test")
.type(Sleep.class.getName())
.duration(100000L)
.timeout(Duration.ofNanos(100000))
.timeout(Property.of(Duration.ofNanos(100000)))
.build()))
.build();

Expand Down
19 changes: 19 additions & 0 deletions core/src/test/resources/flows/valids/pause-delay-from-input.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
id: pause-delay-from-input
namespace: io.kestra.tests

inputs:
- id: delay
type: DURATION
defaults: PT1S

tasks:
- id: pause
type: io.kestra.plugin.core.flow.Pause
delay: "{{inputs.delay}}"
tasks:
- id: subtask
type: io.kestra.plugin.core.log.Log
message: trigger 1 seconds pause"
- id: last
type: io.kestra.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.startDate}}"
5 changes: 5 additions & 0 deletions jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ public void pauseRunDelay() throws Exception {
pauseTest.runDelay(runnerUtils);
}

@Test
public void pauseRunDelayFromInput() throws Exception {
pauseTest.runDelayFromInput(runnerUtils);
}

@Test
public void pauseRunParallelDelay() throws Exception {
pauseTest.runParallelDelay(runnerUtils);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.SystemUtils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -161,7 +162,7 @@ protected CommandsWrapper commands(RunContext runContext) throws IllegalVariable
.withInputFiles(this.getInputFiles())
.withOutputFiles(this.getOutputFiles())
.withEnableOutputDirectory(this.getOutputDirectory())
.withTimeout(this.getTimeout())
.withTimeout(runContext.render(this.getTimeout()).as(Duration.class).orElse(null))
.withTargetOS(this.getTargetOS());
}

Expand Down

0 comments on commit 06474c0

Please sign in to comment.