From 88b68812e00ad1f364fa001262a2c85d2dc8e6f2 Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Wed, 29 Mar 2023 11:05:13 -0600 Subject: [PATCH] Add ability to get task output parameters, more syntactic sugar around complex dependencies, and easy way to define env variables (#517) Signed-off-by: Flaviu Vadan --- docs/examples/workflows/complex_deps.md | 83 +++++++++++++ .../workflows/dag_with_param_passing.md | 75 ++++++++++++ .../dag_with_script_param_passing.md | 92 +++++++++++++++ docs/examples/workflows/multi_env.md | 75 ++++++++++++ .../workflows/script_with_default_params.md | 109 ++++++++++++++++++ examples/workflows/complex-deps.yaml | 48 ++++++++ examples/workflows/complex_deps.py | 18 +++ .../workflows/dag-with-param-passing.yaml | 38 ++++++ .../dag-with-script-param-passing.yaml | 57 +++++++++ examples/workflows/dag_with_param_passing.py | 20 ++++ .../dag_with_script_param_passing.py | 18 +++ examples/workflows/multi-env.yaml | 42 +++++++ examples/workflows/multi_env.py | 16 +++ .../workflows/script-with-default-params.yaml | 78 +++++++++++++ .../workflows/script_with_default_params.py | 14 +++ src/hera/shared/serialization.py | 2 +- src/hera/workflows/_mixins.py | 20 +++- src/hera/workflows/parameter.py | 27 +++-- src/hera/workflows/script.py | 6 +- src/hera/workflows/steps.py | 46 ++++++++ src/hera/workflows/task.py | 68 ++++++++++- 21 files changed, 932 insertions(+), 20 deletions(-) create mode 100644 docs/examples/workflows/complex_deps.md create mode 100644 docs/examples/workflows/dag_with_param_passing.md create mode 100644 docs/examples/workflows/dag_with_script_param_passing.md create mode 100644 docs/examples/workflows/multi_env.md create mode 100644 docs/examples/workflows/script_with_default_params.md create mode 100644 examples/workflows/complex-deps.yaml create mode 100644 examples/workflows/complex_deps.py create mode 100644 examples/workflows/dag-with-param-passing.yaml create mode 100644 examples/workflows/dag-with-script-param-passing.yaml create mode 100644 examples/workflows/dag_with_param_passing.py create mode 100644 examples/workflows/dag_with_script_param_passing.py create mode 100644 examples/workflows/multi-env.yaml create mode 100644 examples/workflows/multi_env.py create mode 100644 examples/workflows/script-with-default-params.yaml create mode 100644 examples/workflows/script_with_default_params.py diff --git a/docs/examples/workflows/complex_deps.md b/docs/examples/workflows/complex_deps.md new file mode 100644 index 000000000..d0e40fbd4 --- /dev/null +++ b/docs/examples/workflows/complex_deps.md @@ -0,0 +1,83 @@ +# Complex Deps + + + + + + +=== "Hera" + + ```python linenums="1" + from hera.workflows import DAG, Workflow, script + + + @script() + def foo(p): + if p < 0.5: + raise Exception(p) + print(42) + + + with Workflow(generate_name="complex-deps-", entrypoint="d") as w: + with DAG(name="d"): + A = foo(name="a", arguments={"p": 0.6}) + B = foo(name="b", arguments={"p": 0.3}) + C = foo(name="c", arguments={"p": 0.7}) + D = foo(name="d", arguments={"p": 0.9}) + # here, D depends on A, B, and C. If A succeeds and one of B or C fails, D still runs + A >> [B, C], [A, (B | C)] >> D + ``` + +=== "YAML" + + ```yaml linenums="1" + apiVersion: argoproj.io/v1alpha1 + kind: Workflow + metadata: + generateName: complex-deps- + spec: + entrypoint: d + templates: + - dag: + tasks: + - arguments: + parameters: + - name: p + value: '0.6' + name: a + template: foo + - arguments: + parameters: + - name: p + value: '0.3' + depends: a + name: b + template: foo + - arguments: + parameters: + - name: p + value: '0.7' + depends: a + name: c + template: foo + - arguments: + parameters: + - name: p + value: '0.9' + depends: a && (b || c) + name: d + template: foo + name: d + - inputs: + parameters: + - name: p + name: foo + script: + command: + - python + image: python:3.7 + source: "import os\nimport sys\nsys.path.append(os.getcwd())\nimport json\n\ + try: p = json.loads(r'''{{inputs.parameters.p}}''')\nexcept: p = r'''{{inputs.parameters.p}}'''\n\ + \nif p < 0.5:\n raise Exception(p)\nprint(42)\n" + ``` + diff --git a/docs/examples/workflows/dag_with_param_passing.md b/docs/examples/workflows/dag_with_param_passing.md new file mode 100644 index 000000000..e7050ac9b --- /dev/null +++ b/docs/examples/workflows/dag_with_param_passing.md @@ -0,0 +1,75 @@ +# Dag With Param Passing + + + + + + +=== "Hera" + + ```python linenums="1" + from hera.workflows import DAG, Container, Parameter, Task, Workflow + + with Workflow(generate_name="param-passing-", entrypoint="d") as w: + out = Container( + name="out", + image="docker/whalesay:latest", + command=["cowsay"], + outputs=Parameter(name="x", value=42), + ) + in_ = Container( + name="in", + image="docker/whalesay:latest", + command=["cowsay"], + args=["{{inputs.parameters.a}}"], + inputs=Parameter(name="a"), + ) + with DAG(name="d"): + t1 = Task(name="a", template=out) + t2 = Task(name="b", template=in_, arguments=t1.get_parameter("x").with_name("a")) + t1 >> t2 + ``` + +=== "YAML" + + ```yaml linenums="1" + apiVersion: argoproj.io/v1alpha1 + kind: Workflow + metadata: + generateName: param-passing- + spec: + entrypoint: d + templates: + - container: + command: + - cowsay + image: docker/whalesay:latest + name: out + outputs: + parameters: + - name: x + value: '42' + - container: + args: + - '{{inputs.parameters.a}}' + command: + - cowsay + image: docker/whalesay:latest + inputs: + parameters: + - name: a + name: in + - dag: + tasks: + - name: a + template: out + - arguments: + parameters: + - name: a + value: '{{tasks.a.outputs.parameters.x}}' + depends: a + name: b + template: in + name: d + ``` + diff --git a/docs/examples/workflows/dag_with_script_param_passing.md b/docs/examples/workflows/dag_with_script_param_passing.md new file mode 100644 index 000000000..e6690749c --- /dev/null +++ b/docs/examples/workflows/dag_with_script_param_passing.md @@ -0,0 +1,92 @@ +# Dag With Script Param Passing + + + + + + +=== "Hera" + + ```python linenums="1" + from hera.workflows import DAG, Parameter, Task, Workflow, script + + + @script() + def out(): + print(42) + + + @script() + def in_(a): + print(a) + + + with Workflow(generate_name="script-param-passing-", entrypoint="d") as w: + with DAG(name="d"): + t1: Task = out() + t2 = in_(arguments=Parameter(name="a", value=t1.result)) + t1 >> t2 + ``` + +=== "YAML" + + ```yaml linenums="1" + apiVersion: argoproj.io/v1alpha1 + kind: Workflow + metadata: + generateName: script-param-passing- + spec: + entrypoint: d + templates: + - dag: + tasks: + - name: out + template: out + - arguments: + parameters: + - name: a + value: '{{tasks.out.outputs.result}}' + depends: out + name: in- + template: in- + name: d + - name: out + script: + command: + - python + image: python:3.7 + source: 'import os + + import sys + + sys.path.append(os.getcwd()) + + print(42) + + ' + - inputs: + parameters: + - name: a + name: in- + script: + command: + - python + image: python:3.7 + source: 'import os + + import sys + + sys.path.append(os.getcwd()) + + import json + + try: a = json.loads(r''''''{{inputs.parameters.a}}'''''') + + except: a = r''''''{{inputs.parameters.a}}'''''' + + + print(a) + + ' + ``` + diff --git a/docs/examples/workflows/multi_env.md b/docs/examples/workflows/multi_env.md new file mode 100644 index 000000000..982f12109 --- /dev/null +++ b/docs/examples/workflows/multi_env.md @@ -0,0 +1,75 @@ +# Multi Env + + + + + + +=== "Hera" + + ```python linenums="1" + from hera.workflows import DAG, Workflow, script + + + @script(env={"a": 1, "b": 2, "c": 3}) + def env(): + import os + + # note that env params come in as strings + assert os.environ["a"] == "1", os.environ["a"] + assert os.environ["b"] == "2", os.environ["b"] + assert os.environ["c"] == "3", os.environ["c"] + + + with Workflow(generate_name="multi-env-", entrypoint="d") as w: + with DAG(name="d"): + env() + ``` + +=== "YAML" + + ```yaml linenums="1" + apiVersion: argoproj.io/v1alpha1 + kind: Workflow + metadata: + generateName: multi-env- + spec: + entrypoint: d + templates: + - dag: + tasks: + - name: env + template: env + name: d + - name: env + script: + command: + - python + env: + - name: a + value: '1' + - name: b + value: '2' + - name: c + value: '3' + image: python:3.7 + source: 'import os + + import sys + + sys.path.append(os.getcwd()) + + import os + + + # note that env params come in as strings + + assert os.environ["a"] == "1", os.environ["a"] + + assert os.environ["b"] == "2", os.environ["b"] + + assert os.environ["c"] == "3", os.environ["c"] + + ' + ``` + diff --git a/docs/examples/workflows/script_with_default_params.md b/docs/examples/workflows/script_with_default_params.md new file mode 100644 index 000000000..82d652618 --- /dev/null +++ b/docs/examples/workflows/script_with_default_params.md @@ -0,0 +1,109 @@ +# Script With Default Params + + + + + + +=== "Hera" + + ```python linenums="1" + from hera.workflows import DAG, Workflow, script + + + @script() + def foo(a, b=42, c=None): + print(a, b, c) + + + with Workflow(generate_name="script-default-params-", entrypoint="d") as w: + with DAG(name="d"): + foo(name="b-unset-c-unset", arguments={"a": 1}) + foo(name="b-set-c-unset", arguments={"a": 1, "b": 2}) + foo(name="b-unset-c-set", arguments={"a": 1, "c": 2}) + foo(name="b-set-c-set", arguments={"a": 1, "b": 2, "c": 3}) + ``` + +=== "YAML" + + ```yaml linenums="1" + apiVersion: argoproj.io/v1alpha1 + kind: Workflow + metadata: + generateName: script-default-params- + spec: + entrypoint: d + templates: + - dag: + tasks: + - arguments: + parameters: + - name: a + value: '1' + name: b-unset-c-unset + template: foo + - arguments: + parameters: + - name: a + value: '1' + - name: b + value: '2' + name: b-set-c-unset + template: foo + - arguments: + parameters: + - name: a + value: '1' + - name: c + value: '2' + name: b-unset-c-set + template: foo + - arguments: + parameters: + - name: a + value: '1' + - name: b + value: '2' + - name: c + value: '3' + name: b-set-c-set + template: foo + name: d + - inputs: + parameters: + - name: a + - default: '42' + name: b + - default: 'null' + name: c + name: foo + script: + command: + - python + image: python:3.7 + source: 'import os + + import sys + + sys.path.append(os.getcwd()) + + import json + + try: a = json.loads(r''''''{{inputs.parameters.a}}'''''') + + except: a = r''''''{{inputs.parameters.a}}'''''' + + try: b = json.loads(r''''''{{inputs.parameters.b}}'''''') + + except: b = r''''''{{inputs.parameters.b}}'''''' + + try: c = json.loads(r''''''{{inputs.parameters.c}}'''''') + + except: c = r''''''{{inputs.parameters.c}}'''''' + + + print(a, b, c) + + ' + ``` + diff --git a/examples/workflows/complex-deps.yaml b/examples/workflows/complex-deps.yaml new file mode 100644 index 000000000..9bb93925d --- /dev/null +++ b/examples/workflows/complex-deps.yaml @@ -0,0 +1,48 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: complex-deps- +spec: + entrypoint: d + templates: + - dag: + tasks: + - arguments: + parameters: + - name: p + value: '0.6' + name: a + template: foo + - arguments: + parameters: + - name: p + value: '0.3' + depends: a + name: b + template: foo + - arguments: + parameters: + - name: p + value: '0.7' + depends: a + name: c + template: foo + - arguments: + parameters: + - name: p + value: '0.9' + depends: a && (b || c) + name: d + template: foo + name: d + - inputs: + parameters: + - name: p + name: foo + script: + command: + - python + image: python:3.7 + source: "import os\nimport sys\nsys.path.append(os.getcwd())\nimport json\n\ + try: p = json.loads(r'''{{inputs.parameters.p}}''')\nexcept: p = r'''{{inputs.parameters.p}}'''\n\ + \nif p < 0.5:\n raise Exception(p)\nprint(42)\n" diff --git a/examples/workflows/complex_deps.py b/examples/workflows/complex_deps.py new file mode 100644 index 000000000..16864b3a1 --- /dev/null +++ b/examples/workflows/complex_deps.py @@ -0,0 +1,18 @@ +from hera.workflows import DAG, Workflow, script + + +@script() +def foo(p): + if p < 0.5: + raise Exception(p) + print(42) + + +with Workflow(generate_name="complex-deps-", entrypoint="d") as w: + with DAG(name="d"): + A = foo(name="a", arguments={"p": 0.6}) + B = foo(name="b", arguments={"p": 0.3}) + C = foo(name="c", arguments={"p": 0.7}) + D = foo(name="d", arguments={"p": 0.9}) + # here, D depends on A, B, and C. If A succeeds and one of B or C fails, D still runs + A >> [B, C], [A, (B | C)] >> D diff --git a/examples/workflows/dag-with-param-passing.yaml b/examples/workflows/dag-with-param-passing.yaml new file mode 100644 index 000000000..dfad647d2 --- /dev/null +++ b/examples/workflows/dag-with-param-passing.yaml @@ -0,0 +1,38 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: param-passing- +spec: + entrypoint: d + templates: + - container: + command: + - cowsay + image: docker/whalesay:latest + name: out + outputs: + parameters: + - name: x + value: '42' + - container: + args: + - '{{inputs.parameters.a}}' + command: + - cowsay + image: docker/whalesay:latest + inputs: + parameters: + - name: a + name: in + - dag: + tasks: + - name: a + template: out + - arguments: + parameters: + - name: a + value: '{{tasks.a.outputs.parameters.x}}' + depends: a + name: b + template: in + name: d diff --git a/examples/workflows/dag-with-script-param-passing.yaml b/examples/workflows/dag-with-script-param-passing.yaml new file mode 100644 index 000000000..597e4ca37 --- /dev/null +++ b/examples/workflows/dag-with-script-param-passing.yaml @@ -0,0 +1,57 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: script-param-passing- +spec: + entrypoint: d + templates: + - dag: + tasks: + - name: out + template: out + - arguments: + parameters: + - name: a + value: '{{tasks.out.outputs.result}}' + depends: out + name: in- + template: in- + name: d + - name: out + script: + command: + - python + image: python:3.7 + source: 'import os + + import sys + + sys.path.append(os.getcwd()) + + print(42) + + ' + - inputs: + parameters: + - name: a + name: in- + script: + command: + - python + image: python:3.7 + source: 'import os + + import sys + + sys.path.append(os.getcwd()) + + import json + + try: a = json.loads(r''''''{{inputs.parameters.a}}'''''') + + except: a = r''''''{{inputs.parameters.a}}'''''' + + + print(a) + + ' diff --git a/examples/workflows/dag_with_param_passing.py b/examples/workflows/dag_with_param_passing.py new file mode 100644 index 000000000..e66d37dfa --- /dev/null +++ b/examples/workflows/dag_with_param_passing.py @@ -0,0 +1,20 @@ +from hera.workflows import DAG, Container, Parameter, Task, Workflow + +with Workflow(generate_name="param-passing-", entrypoint="d") as w: + out = Container( + name="out", + image="docker/whalesay:latest", + command=["cowsay"], + outputs=Parameter(name="x", value=42), + ) + in_ = Container( + name="in", + image="docker/whalesay:latest", + command=["cowsay"], + args=["{{inputs.parameters.a}}"], + inputs=Parameter(name="a"), + ) + with DAG(name="d"): + t1 = Task(name="a", template=out) + t2 = Task(name="b", template=in_, arguments=t1.get_parameter("x").with_name("a")) + t1 >> t2 diff --git a/examples/workflows/dag_with_script_param_passing.py b/examples/workflows/dag_with_script_param_passing.py new file mode 100644 index 000000000..53a0b27d2 --- /dev/null +++ b/examples/workflows/dag_with_script_param_passing.py @@ -0,0 +1,18 @@ +from hera.workflows import DAG, Parameter, Task, Workflow, script + + +@script() +def out(): + print(42) + + +@script() +def in_(a): + print(a) + + +with Workflow(generate_name="script-param-passing-", entrypoint="d") as w: + with DAG(name="d"): + t1: Task = out() + t2 = in_(arguments=Parameter(name="a", value=t1.result)) + t1 >> t2 diff --git a/examples/workflows/multi-env.yaml b/examples/workflows/multi-env.yaml new file mode 100644 index 000000000..ccdb0c4c2 --- /dev/null +++ b/examples/workflows/multi-env.yaml @@ -0,0 +1,42 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: multi-env- +spec: + entrypoint: d + templates: + - dag: + tasks: + - name: env + template: env + name: d + - name: env + script: + command: + - python + env: + - name: a + value: '1' + - name: b + value: '2' + - name: c + value: '3' + image: python:3.7 + source: 'import os + + import sys + + sys.path.append(os.getcwd()) + + import os + + + # note that env params come in as strings + + assert os.environ["a"] == "1", os.environ["a"] + + assert os.environ["b"] == "2", os.environ["b"] + + assert os.environ["c"] == "3", os.environ["c"] + + ' diff --git a/examples/workflows/multi_env.py b/examples/workflows/multi_env.py new file mode 100644 index 000000000..4c4592783 --- /dev/null +++ b/examples/workflows/multi_env.py @@ -0,0 +1,16 @@ +from hera.workflows import DAG, Workflow, script + + +@script(env={"a": 1, "b": 2, "c": 3}) +def env(): + import os + + # note that env params come in as strings + assert os.environ["a"] == "1", os.environ["a"] + assert os.environ["b"] == "2", os.environ["b"] + assert os.environ["c"] == "3", os.environ["c"] + + +with Workflow(generate_name="multi-env-", entrypoint="d") as w: + with DAG(name="d"): + env() diff --git a/examples/workflows/script-with-default-params.yaml b/examples/workflows/script-with-default-params.yaml new file mode 100644 index 000000000..f67114265 --- /dev/null +++ b/examples/workflows/script-with-default-params.yaml @@ -0,0 +1,78 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: script-default-params- +spec: + entrypoint: d + templates: + - dag: + tasks: + - arguments: + parameters: + - name: a + value: '1' + name: b-unset-c-unset + template: foo + - arguments: + parameters: + - name: a + value: '1' + - name: b + value: '2' + name: b-set-c-unset + template: foo + - arguments: + parameters: + - name: a + value: '1' + - name: c + value: '2' + name: b-unset-c-set + template: foo + - arguments: + parameters: + - name: a + value: '1' + - name: b + value: '2' + - name: c + value: '3' + name: b-set-c-set + template: foo + name: d + - inputs: + parameters: + - name: a + - default: '42' + name: b + - default: 'null' + name: c + name: foo + script: + command: + - python + image: python:3.7 + source: 'import os + + import sys + + sys.path.append(os.getcwd()) + + import json + + try: a = json.loads(r''''''{{inputs.parameters.a}}'''''') + + except: a = r''''''{{inputs.parameters.a}}'''''' + + try: b = json.loads(r''''''{{inputs.parameters.b}}'''''') + + except: b = r''''''{{inputs.parameters.b}}'''''' + + try: c = json.loads(r''''''{{inputs.parameters.c}}'''''') + + except: c = r''''''{{inputs.parameters.c}}'''''' + + + print(a, b, c) + + ' diff --git a/examples/workflows/script_with_default_params.py b/examples/workflows/script_with_default_params.py new file mode 100644 index 000000000..3cd35a60a --- /dev/null +++ b/examples/workflows/script_with_default_params.py @@ -0,0 +1,14 @@ +from hera.workflows import DAG, Workflow, script + + +@script() +def foo(a, b=42, c=None): + print(a, b, c) + + +with Workflow(generate_name="script-default-params-", entrypoint="d") as w: + with DAG(name="d"): + foo(name="b-unset-c-unset", arguments={"a": 1}) + foo(name="b-set-c-unset", arguments={"a": 1, "b": 2}) + foo(name="b-unset-c-set", arguments={"a": 1, "c": 2}) + foo(name="b-set-c-set", arguments={"a": 1, "b": 2, "c": 3}) diff --git a/src/hera/shared/serialization.py b/src/hera/shared/serialization.py index 2c75c9e7b..4fcfb3c9c 100644 --- a/src/hera/shared/serialization.py +++ b/src/hera/shared/serialization.py @@ -5,7 +5,7 @@ def serialize(value: Any): - if value is MISSING: + if value == MISSING: return None elif isinstance(value, str): return value diff --git a/src/hera/workflows/_mixins.py b/src/hera/workflows/_mixins.py index 95af5fd91..464a10559 100644 --- a/src/hera/workflows/_mixins.py +++ b/src/hera/workflows/_mixins.py @@ -52,7 +52,7 @@ VolumeDevice, VolumeMount, ) -from hera.workflows.parameter import Parameter +from hera.workflows.parameter import MISSING, Parameter from hera.workflows.resources import Resources from hera.workflows.user_container import UserContainer from hera.workflows.volume import Volume, _BaseVolume @@ -78,8 +78,15 @@ List[Union[Parameter, ModelParameter, Artifact, ModelArtifact, Dict[str, Any]]], ] ] -EnvT = Optional[Union[Union[_BaseEnv, EnvVar], List[Union[_BaseEnv, EnvVar]]]] -EnvFromT = Optional[Union[Union[_BaseEnvFrom, EnvFromSource], List[Union[_BaseEnvFrom, EnvFromSource]]]] +EnvT = Optional[ + Union[ + _BaseEnv, + EnvVar, + List[Union[_BaseEnv, EnvVar, Dict[str, Any]]], + Dict[str, Any], + ] +] +EnvFromT = Optional[Union[_BaseEnvFrom, EnvFromSource, List[Union[_BaseEnvFrom, EnvFromSource]]]] TContext = TypeVar("TContext", bound="ContextMixin") THookable = TypeVar("THookable", bound="HookMixin") @@ -215,6 +222,9 @@ def _build_env(self) -> Optional[List[EnvVar]]: result.append(e) elif isinstance(e, _BaseEnv): result.append(e.build()) + elif isinstance(e, dict): + for k, v in e.items(): + result.append(EnvVar(name=k, value=v)) return result def _build_env_from(self) -> Optional[List[EnvFromSource]]: @@ -522,12 +532,12 @@ def one(xs: List): def _get_params_from_source(source: Callable) -> Optional[List[Parameter]]: - source_signature: Dict[str, Optional[str]] = {} + source_signature: Dict[str, Optional[object]] = {} for p in inspect.signature(source).parameters.values(): if p.default != inspect.Parameter.empty and p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD: source_signature[p.name] = p.default else: - source_signature[p.name] = None + source_signature[p.name] = MISSING if len(source_signature) == 0: return None diff --git a/src/hera/workflows/parameter.py b/src/hera/workflows/parameter.py index df8ebd135..1c98ff19c 100644 --- a/src/hera/workflows/parameter.py +++ b/src/hera/workflows/parameter.py @@ -3,11 +3,13 @@ See https://argoproj.github.io/argo-workflows/walk-through/parameters/ for a tutorial on Parameters. """ +from __future__ import annotations + from typing import Any, Optional from pydantic import root_validator -from hera.shared.serialization import serialize +from hera.shared.serialization import MISSING, serialize from hera.workflows.models import Parameter as _ModelParameter @@ -18,18 +20,21 @@ class Parameter(_ModelParameter): for Steps and Tasks to assign values. """ - value: Optional[Any] + # `MISSING` is the default value so that `Parameter` serialization understands the difference between a + # missing value and a value of `None`, as set by a user. With this, when something sets a value of `None` it is + # taken as a proper `None`. By comparison, if a user does not set a value, it is taken as `MISSING` and therefore + # not serialized. This happens because the values if turned into an _actual_ `None` by `serialize` and therefore + # Pydantic will not include it in the YAML that is passed to Argo + value: Optional[Any] = MISSING + default: Optional[Any] = MISSING - @root_validator(pre=True) + @root_validator(pre=True, allow_reuse=True) def _check_values(cls, values): if values.get("value") is not None and values.get("value_from") is not None: raise ValueError("Cannot specify both `value` and `value_from` when instantiating `Parameter`") - if values.get("value") is not None and not isinstance(values.get("value"), str): - values["value"] = serialize(values.get("value")) - - if values.get("default") is not None and not isinstance(values.get("value"), str): - values["default"] = serialize(values.get("default")) + values["value"] = serialize(values.get("value", MISSING)) + values["default"] = serialize(values.get("default", MISSING)) return values @@ -43,6 +48,12 @@ def __str__(self): raise ValueError("Cannot represent `Parameter` as string as `value` is not set") return self.value + def with_name(self, name: str) -> Parameter: + """Returns a copy of the parameter with the name set to the value""" + p = self.copy(deep=True) + p.name = name + return p + def as_input(self) -> _ModelParameter: """Assembles the parameter for use as an input of a template""" return _ModelParameter( diff --git a/src/hera/workflows/script.py b/src/hera/workflows/script.py index f7d7ed0cf..32450aadd 100644 --- a/src/hera/workflows/script.py +++ b/src/hera/workflows/script.py @@ -24,7 +24,7 @@ SecurityContext, Template as _ModelTemplate, ) -from hera.workflows.parameter import Parameter +from hera.workflows.parameter import MISSING, Parameter from hera.workflows.steps import Step from hera.workflows.task import Task @@ -213,12 +213,12 @@ def _build_script(self) -> _ModelScriptTemplate: def _get_parameters_from_callable(source: Callable) -> Optional[List[Parameter]]: # If there are any kwargs arguments associated with the function signature, # we store these as we can set them as default values for argo arguments - source_signature: Dict[str, Optional[str]] = {} + source_signature: Dict[str, Optional[object]] = {} for p in inspect.signature(source).parameters.values(): if p.default != inspect.Parameter.empty and p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD: source_signature[p.name] = p.default else: - source_signature[p.name] = None + source_signature[p.name] = MISSING if len(source_signature) == 0: return None diff --git a/src/hera/workflows/steps.py b/src/hera/workflows/steps.py index 72e30685d..5f4da8f91 100644 --- a/src/hera/workflows/steps.py +++ b/src/hera/workflows/steps.py @@ -20,6 +20,7 @@ Template as _ModelTemplate, WorkflowStep as _ModelWorkflowStep, ) +from hera.workflows.parameter import Parameter from hera.workflows.protocol import Steppable, Templatable @@ -62,6 +63,51 @@ def finished_at(self) -> str: def result(self) -> str: return f"{{{{steps.{self.name}.outputs.result}}}}" + def get_parameters_as(self, name): + """Gets all the output parameters from this task""" + return Parameter(name=name, value=f"{{{{steps.{self.name}.outputs.parameters}}}}") + + def get_parameter(self, name: str) -> Parameter: + """Returns a Parameter from the task's outputs based on the name. + + Parameters + ---------- + name: str + The name of the parameter to extract as an output. + + Returns + ------- + Parameter + Parameter with the same name + """ + if isinstance(self.template, str): + raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") + + # here, we build the template early to verify that we can get the outputs + if isinstance(self.template, Templatable): + template = self.template._build_template() + else: + template = self.template + + # at this point, we know that the template is a `Template` object + if template.outputs is None: # type: ignore + raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}") + if template.outputs.parameters is None: # type: ignore + raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}") + parameters = template.outputs.parameters # type: ignore + + obj = next((output for output in parameters if output.name == name), None) + if obj is not None: + obj.value = f"{{{{steps.{self.name}.outputs.parameters.{name}}}}}" + return Parameter( + name=obj.name, + value=obj.value, + value_from=obj.value_from, + global_name=obj.global_name, + description=obj.description, + ) + raise KeyError(f"No output parameter named `{name}` found") + def _build_as_workflow_step(self) -> _ModelWorkflowStep: _template = None if isinstance(self.template, str): diff --git a/src/hera/workflows/task.py b/src/hera/workflows/task.py index 670a23878..d7b2c20bd 100644 --- a/src/hera/workflows/task.py +++ b/src/hera/workflows/task.py @@ -21,6 +21,7 @@ Template, ) from hera.workflows.operator import Operator +from hera.workflows.parameter import Parameter from hera.workflows.protocol import Templatable from hera.workflows.workflow_status import WorkflowStatus @@ -145,6 +146,51 @@ def finished_at(self) -> str: def result(self) -> str: return f"{{{{tasks.{self.name}.outputs.result}}}}" + def get_parameters_as(self, name): + """Gets all the output parameters from this task""" + return Parameter(name=name, value=f"{{{{tasks.{self.name}.outputs.parameters}}}}") + + def get_parameter(self, name: str) -> Parameter: + """Returns a Parameter from the task's outputs based on the name. + + Parameters + ---------- + name: str + The name of the parameter to extract as an output. + + Returns + ------- + Parameter + Parameter with the same name + """ + if isinstance(self.template, str): + raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") + + # here, we build the template early to verify that we can get the outputs + if isinstance(self.template, Templatable): + template = self.template._build_template() + else: + template = self.template + + # at this point, we know that the template is a `Template` object + if template.outputs is None: # type: ignore + raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}") + if template.outputs.parameters is None: # type: ignore + raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}") + parameters = template.outputs.parameters # type: ignore + + obj = next((output for output in parameters if output.name == name), None) + if obj is not None: + obj.value = f"{{{{tasks.{self.name}.outputs.parameters.{name}}}}}" + return Parameter( + name=obj.name, + value=obj.value, + value_from=obj.value_from, + global_name=obj.global_name, + description=obj.description, + ) + raise KeyError(f"No output parameter named `{name}` found") + def next(self, other: Task, operator: Operator = Operator.and_, on: Optional[TaskResult] = None) -> Task: """Set self as a dependency of `other`.""" assert issubclass(other.__class__, Task) @@ -161,14 +207,23 @@ def next(self, other: Task, operator: Operator = Operator.and_, on: Optional[Tas other.depends += f" {operator} {self.name + condition}" return other - def __rrshift__(self, other: List[Task]) -> Task: + def __rrshift__(self, other: List[Union[Task, str]]) -> Task: """Set `other` as a dependency self.""" assert isinstance(other, list), f"Unknown type {type(other)} specified using reverse right bitshift operator" for o in other: - o.next(self) + if isinstance(o, Task): + o.next(self) + else: + assert isinstance( + o, str + ), f"Unknown list item type {type(o)} specified using reverse right bitshift operator" + if self.depends is None: + self.depends = o + else: + self.depends += f" && {o}" return self - def __rshift__(self, other: Union["Task", List["Task"]]) -> Union[Task, List[Task]]: + def __rshift__(self, other: Union[Task, List[Task]]) -> Union[Task, List[Task]]: """Set self as a dependency of `other` which can be a single Task or list of Tasks.""" if isinstance(other, Task): return self.next(other) @@ -181,6 +236,13 @@ def __rshift__(self, other: Union["Task", List["Task"]]) -> Union[Task, List[Tas return other raise ValueError(f"Unknown type {type(other)} provided to `__rshift__`") + def __or__(self, other: Union[Task, str]) -> str: + """Adds a condition of""" + if isinstance(other, Task): + return f"({self.name} || {other.name})" + assert isinstance(other, str), f"Unknown type {type(other)} specified using `|` operator" + return f"{self.name} || {other}" + def on_workflow_status(self, status: WorkflowStatus, op: Operator = Operator.equals) -> Task: expression = f"{{{{workflow.status}}}} {op} {status}" if self.when: