From 01cab094f00eb2fb8df91155b2f922c7f803b7a3 Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 01:16:13 +0700 Subject: [PATCH 01/10] :gear: fixed: change cutting running id logic. --- docs/logs.md | 14 ++++++++++++++ src/ddeutil/workflow/utils.py | 2 +- src/ddeutil/workflow/workflow.py | 12 ++++++------ tests/test_workflow_exec.py | 1 + 4 files changed, 22 insertions(+), 7 deletions(-) create mode 100644 docs/logs.md diff --git a/docs/logs.md b/docs/logs.md new file mode 100644 index 00000000..a8de929c --- /dev/null +++ b/docs/logs.md @@ -0,0 +1,14 @@ +# Logs + +For logging of this package will split to 2 parts: + +1. Trace log +2. Audit log + +## Trace Log + +This part will show all process logs that return with the trace model. + +## Audit Log + +This part will use to tracking workflow release log. diff --git a/src/ddeutil/workflow/utils.py b/src/ddeutil/workflow/utils.py index e6f0c016..ac61a3ca 100644 --- a/src/ddeutil/workflow/utils.py +++ b/src/ddeutil/workflow/utils.py @@ -335,7 +335,7 @@ def cut_id(run_id: str, *, num: int = 6) -> str: if "T" in run_id: dt, simple = run_id.split("T", maxsplit=1) return dt[:12] + simple[-num:] - return run_id[:12] + run_id[-num:] + return run_id[-num:] @overload diff --git a/src/ddeutil/workflow/workflow.py b/src/ddeutil/workflow/workflow.py index f561a947..660b89b1 100644 --- a/src/ddeutil/workflow/workflow.py +++ b/src/ddeutil/workflow/workflow.py @@ -722,6 +722,11 @@ def execute( ) catch(context, status=WAIT) if event and event.is_set(): + err_msg: str = ( + "Execution was canceled from the event was set " + "before workflow execution." + ) + trace.error(f"[WORKFLOW]: {err_msg}") return Result( run_id=run_id, parent_run_id=parent_run_id, @@ -729,12 +734,7 @@ def execute( context=catch( context, status=CANCEL, - updated={ - "errors": WorkflowCancelError( - "Execution was canceled from the event was set " - "before workflow execution." - ).to_dict(), - }, + updated={"errors": WorkflowCancelError(err_msg).to_dict()}, ), info={"execution_time": time.monotonic() - ts}, extras=self.extras, diff --git a/tests/test_workflow_exec.py b/tests/test_workflow_exec.py index 3b5c74b6..8bcf16ae 100644 --- a/tests/test_workflow_exec.py +++ b/tests/test_workflow_exec.py @@ -116,6 +116,7 @@ def test_workflow_exec_raise_event_set(): def test_workflow_exec_py(): workflow = Workflow.from_conf(name="wf-run-python") rs: Result = workflow.execute( + run_id="1001", params={ "author-run": "Local Workflow", "run-date": "2024-01-01", From eff651c2e17d526c0f52861697238bb49e0627c5 Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 10:32:37 +0700 Subject: [PATCH 02/10] :gear: fixed: add stopper for disable circle execution. --- src/ddeutil/workflow/result.py | 18 +++- src/ddeutil/workflow/stages.py | 136 ++++++++++++------------ src/ddeutil/workflow/workflow.py | 2 +- tests/test_result.py | 11 ++ tests/test_utils.py | 4 +- tests/test_workflow_exec.py | 176 ++++++++++++++++++++++++++++++- 6 files changed, 268 insertions(+), 79 deletions(-) diff --git a/src/ddeutil/workflow/result.py b/src/ddeutil/workflow/result.py index c0c84713..1938954b 100644 --- a/src/ddeutil/workflow/result.py +++ b/src/ddeutil/workflow/result.py @@ -21,12 +21,12 @@ from dataclasses import field from enum import Enum -from typing import Optional, Union +from typing import Optional, TypedDict, Union from pydantic import ConfigDict from pydantic.dataclasses import dataclass from pydantic.functional_validators import model_validator -from typing_extensions import Self +from typing_extensions import NotRequired, Self from . import ( JobCancelError, @@ -40,7 +40,7 @@ ) from .__types import DictData from .audits import TraceManager, get_trace -from .errors import ResultError +from .errors import ErrorData, ResultError from .utils import default_gen_id @@ -274,6 +274,9 @@ def catch( context: A context data that want to be the current context. status: A status enum object. updated: A updated data that will update to the current context. + + Returns: + DictData: A catch context data. """ context.update(updated or {}) context["status"] = Status(status) if isinstance(status, int) else status @@ -291,3 +294,12 @@ def catch( else: raise ResultError(f"The key {k!r} does not exists on context data.") return context + + +class Context(TypedDict): + """Context dict typed.""" + + status: Status + context: NotRequired[DictData] + errors: NotRequired[Union[list[ErrorData], ErrorData]] + info: NotRequired[DictData] diff --git a/src/ddeutil/workflow/stages.py b/src/ddeutil/workflow/stages.py index 620dff63..ae3cb0ba 100644 --- a/src/ddeutil/workflow/stages.py +++ b/src/ddeutil/workflow/stages.py @@ -36,16 +36,6 @@ execute method receives a `params={"params": {...}}` value for passing template searching. - All stages model inherit from `BaseStage` or `AsyncBaseStage` models that has the -base fields: - -| field | alias | data type | default | description | -|-----------|-------|-------------|:--------:|-----------------------------------------------------------------------| -| id | | str \| None | `None` | A stage ID that use to keep execution output or getting by job owner. | -| name | | str | | A stage name that want to log when start execution. | -| condition | if | str \| None | `None` | A stage condition statement to allow stage executable. | -| extras | | dict | `dict()` | An extra parameter that override core config values. | - It has a special base class is `BaseRetryStage` that inherit from `AsyncBaseStage` that use to handle retry execution when it got any error with `retry` field. """ @@ -155,13 +145,11 @@ class BaseStage(BaseModel, ABC): process: Main execution logic that must be implemented by subclasses Example: - ```python - class CustomStage(BaseStage): - custom_param: str = Field(description="Custom parameter") - - def process(self, params: dict, **kwargs) -> Result: - # Custom execution logic - return Result(status=SUCCESS) + >>> class CustomStage(BaseStage): + ... custom_param: str = Field(description="Custom parameter") + ... + ... def process(self, params: DictData, **kwargs) -> Result: + ... return Result(status=SUCCESS) ``` """ @@ -208,7 +196,8 @@ def iden(self) -> str: def ___prepare_desc__(cls, value: str) -> str: """Prepare description string that was created on a template. - :rtype: str + Returns: + str: A dedent and left strip newline of description string. """ return dedent(value.lstrip("\n")) @@ -218,16 +207,17 @@ def __prepare_running_id(self) -> Self: method will validate name and id fields should not contain any template parameter (exclude matrix template). - :raise ValueError: When the ID and name fields include matrix parameter - template with the 'matrix.' string value. + Raises: + ValueError: When the ID and name fields include matrix parameter + template with the 'matrix.' string value. - :rtype: Self + Returns: Self """ # VALIDATE: Validate stage id and name should not dynamic with params # template. (allow only matrix) if not_in_template(self.id) or not_in_template(self.name): raise ValueError( - "Stage name and ID should only template with 'matrix.'" + "Stage name and ID should only template with 'matrix.?'." ) return self @@ -352,10 +342,7 @@ def execute( StageCancelError, StageError, ) as e: # pragma: no cov - trace.info( - f"[STAGE]: Handler:||{e.__class__.__name__}: {e}||" - f"{traceback.format_exc()}" - ) + trace.info(f"[STAGE]: Handler:||{traceback.format_exc()}") st: Status = get_status_from_error(e) return Result( run_id=run_id, @@ -374,10 +361,7 @@ def execute( extras=self.extras, ) except Exception as e: - trace.error( - f"[STAGE]: Error Handler:||{e.__class__.__name__}: {e}||" - f"{traceback.format_exc()}" - ) + trace.error(f"[STAGE]: Error Handler:||{traceback.format_exc()}") return Result( run_id=run_id, parent_run_id=parent_run_id, @@ -681,8 +665,7 @@ async def axecute( StageError, ) as e: # pragma: no cov await trace.ainfo( - f"[STAGE]: Skip Handler:||{e.__class__.__name__}: {e}||" - f"{traceback.format_exc()}" + f"[STAGE]: Skip Handler:||{traceback.format_exc()}" ) st: Status = get_status_from_error(e) return Result( @@ -703,8 +686,7 @@ async def axecute( ) except Exception as e: await trace.aerror( - f"[STAGE]: Error Handler:||{e.__class__.__name__}: {e}||" - f"{traceback.format_exc()}" + f"[STAGE]: Error Handler:||{traceback.format_exc()}" ) return Result( run_id=run_id, @@ -910,19 +892,15 @@ class EmptyStage(BaseAsyncStage): EmptyStage is a utility stage that performs no actual work but provides logging output and optional delays. It's commonly used for: - - Debugging workflow execution flow - - Adding informational messages to workflows - - Creating delays between stages - - Testing template parameter resolution + - Debugging workflow execution flow + - Adding informational messages to workflows + - Creating delays between stages + - Testing template parameter resolution The stage outputs the echo message to stdout and can optionally sleep for a specified duration, making it useful for workflow timing control and debugging scenarios. - Attributes: - echo (str, optional): Message to display during execution - sleep (float): Duration to sleep after logging (0-1800 seconds) - Example: ```yaml stages: @@ -934,24 +912,25 @@ class EmptyStage(BaseAsyncStage): echo: "Processing file: ${{ params.filename }}" ``` - ```python - stage = EmptyStage( - name="Status Update", - echo="Processing completed successfully", - sleep=1.0 - ) - ``` + >>> stage = EmptyStage( + ... name="Status Update", + ... echo="Processing completed successfully", + ... sleep=1.0 + ... ) """ echo: StrOrNone = Field( default=None, - description="A message that want to show on the stdout.", + description=( + "A message that want to display on the stdout during execution. " + "By default, it do not show any message." + ), ) sleep: float = Field( default=0, description=( - "A second value to sleep before start execution. This value should " - "gather or equal 0, and less than 1800 seconds." + "A duration in second value to sleep after logging. This value " + "should between 0 - 1800 seconds." ), ge=0, lt=1800, @@ -1396,11 +1375,11 @@ def process( to globals argument on `exec` build-in function. Args: - params: A parameter data that want to use in this + params (DictData): A parameter data that want to use in this execution. - run_id: A running stage ID. + run_id (str): A running stage ID. context: A context data. - parent_run_id: A parent running ID. (Default is None) + parent_run_id (str | None, default None): A parent running ID. event: An event manager that use to track parent process was not force stopped. @@ -1410,7 +1389,7 @@ def process( trace: TraceManager = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) - trace.info("[STAGE]: Prepare `globals` and `locals` variables.") + trace.debug("[STAGE]: Prepare `globals` and `locals` variables.") lc: DictData = {} gb: DictData = ( globals() @@ -1965,6 +1944,9 @@ class TriggerStage(BaseNestedStage): execute method. This is the stage that allow you to create the reusable Workflow template with dynamic parameters. + This stage does not allow to pass the workflow model directly to the + trigger field. A trigger workflow name should exist on the config path only. + Data Validate: >>> stage = { ... "name": "Trigger workflow stage execution", @@ -2015,7 +1997,18 @@ def process( run_id, parent_run_id=parent_run_id, extras=self.extras ) _trigger: str = param2template(self.trigger, params, extras=self.extras) + # if _trigger in self.extras.get("stop_circle_workflow_name", []): + # raise StageError( + # "[STAGE]: Circle execution via trigger itself workflow name." + # ) trace.info(f"[STAGE]: Load workflow: {_trigger!r}") + + # # NOTE: add noted key for cancel circle execution. + # if "stop_circle_workflow_name" in self.extras: + # self.extras["stop_circle_workflow_name"].append(_trigger) + # else: + # self.extras.update({"stop_circle_workflow_name": [_trigger]}) + result: Result = Workflow.from_conf( name=pass_env(_trigger), extras=self.extras, @@ -2026,7 +2019,7 @@ def process( event=event, ) if result.status == FAILED: - err_msg: StrOrNone = ( + err_msg: str = ( f" with:\n{msg}" if (msg := result.context.get("errors", {}).get("message")) else "." @@ -2100,21 +2093,24 @@ def _process_branch( """Execute branch that will execute all nested-stage that was set in this stage with specific branch ID. - :param branch: (str) A branch ID. - :param params: (DictData) A parameter data. - :param run_id: (str) - :param context: (DictData) - :param parent_run_id: (str | None) - :param event: (Event) An Event manager instance that use to cancel this - execution if it forces stopped by parent execution. - (Default is None) + Args: + branch (str): A branch ID. + params (DictData): A parameter data. + run_id (str): A running ID. + context (DictData): + parent_run_id (str | None, default None): A parent running ID. + event: (Event) An Event manager instance that use to cancel this + execution if it forces stopped by parent execution. + (Default is None) - :raise StageCancelError: If event was set. - :raise StageCancelError: If result from a nested-stage return canceled - status. - :raise StageError: If result from a nested-stage return failed status. + Raises: + StageCancelError: If event was set. + StageCancelError: If result from a nested-stage return canceled + status. + StageError: If result from a nested-stage return failed status. - :rtype: tuple[Status, DictData] + Returns: + tuple[Status, DictData]: A pair of status and result context data. """ trace: TraceManager = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras diff --git a/src/ddeutil/workflow/workflow.py b/src/ddeutil/workflow/workflow.py index 660b89b1..33881c5a 100644 --- a/src/ddeutil/workflow/workflow.py +++ b/src/ddeutil/workflow/workflow.py @@ -684,7 +684,7 @@ def execute( """ ts: float = time.monotonic() parent_run_id: Optional[str] = run_id - run_id: str = gen_id(self.name, extras=self.extras) + run_id: str = gen_id(self.name, unique=True, extras=self.extras) trace: TraceManager = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) diff --git a/tests/test_result.py b/tests/test_result.py index 566c43de..c363f44f 100644 --- a/tests/test_result.py +++ b/tests/test_result.py @@ -7,6 +7,7 @@ FAILED, SUCCESS, WAIT, + Context, Result, Status, catch, @@ -118,3 +119,13 @@ def test_catch(): with pytest.raises(ResultError): catch({}, status=SUCCESS, foo={"key": "bar"}) + + +def test_context_type(): + _: Context = {"status": WAIT} + + # NOTE: This line will alert from IDE. + _: Context = {"status": SUCCESS, "info": "demo"} + + # NOTE: This line will alert from IDE. + _: Context = {"status": SUCCESS, "not-set": "demo"} diff --git a/tests/test_utils.py b/tests/test_utils.py index cae317d1..cb53210d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -106,9 +106,7 @@ def test_cut_id(): assert ( cut_id(run_id="20240101081330000000T1354680202") == "202401010813680202" ) - assert ( - cut_id(run_id="3509917790201200503600070303500") == "350991779020303500" - ) + assert cut_id(run_id="3509917790201200503600070303500") == "303500" @freeze_time("2024-01-01 01:13:30") diff --git a/tests/test_workflow_exec.py b/tests/test_workflow_exec.py index 8bcf16ae..76ae7c62 100644 --- a/tests/test_workflow_exec.py +++ b/tests/test_workflow_exec.py @@ -3,6 +3,7 @@ from textwrap import dedent from unittest.mock import patch +import pytest from ddeutil.core import getdot from ddeutil.workflow import ( CANCEL, @@ -307,9 +308,32 @@ def test_workflow_exec_py_with_parallel(): def test_workflow_exec_py_raise(): - rs: Result = Workflow.from_conf("wf-run-python-raise").execute( - params={}, max_job_parallel=1 + workflow = Workflow.model_validate( + { + "name": "wf-run-python-raise", + "type": "Workflow", + "jobs": { + "first-job": { + "stages": [ + { + "name": "Raise Error Inside", + "id": "raise-error", + "run": "raise ValueError('Testing raise error inside PyStage!!!')", + } + ], + }, + "second-job": { + "stages": [ + { + "name": "Echo hello world", + "echo": "Hello World", + } + ] + }, + }, + } ) + rs: Result = workflow.execute(params={}, max_job_parallel=1) assert rs.status == FAILED assert rs.context == { "status": FAILED, @@ -1088,6 +1112,7 @@ def test_workflow_exec_raise_from_job_exec(): assert rs.status == FAILED +@pytest.mark.skip def test_workflow_exec_raise_job_trigger(test_path): with dump_yaml_context( test_path / "conf/demo/01_99_wf_test_wf_exec_raise_job_trigger.yml", @@ -1145,3 +1170,150 @@ def test_workflow_exec_raise_job_trigger(test_path): } }, } + + +def test_workflow_exec_circle_trigger(test_path): + with dump_yaml_context( + test_path / "conf/demo/01_99_wf_test_wf_exec_circle.yml", + data=""" + wf-circle: + type: Workflow + jobs: + first-job: + stages: + - name: "Trigger itself" + trigger: wf-circle + """, + ): + workflow = Workflow.from_conf(name="wf-circle") + rs: Result = workflow.execute({}) + assert rs.status == FAILED + assert rs.context == { + "params": {}, + "jobs": { + "first-job": { + "status": FAILED, + "stages": { + "1099837090": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'Trigger itself', failed.", + }, + } + }, + "status": FAILED, + "errors": { + "name": "WorkflowError", + "message": "Job execution, 'first-job', was failed.", + }, + } + + with dump_yaml_context( + test_path / "conf/demo/01_99_wf_test_wf_exec_circle_runtime.yml", + data=""" + wf-circle-runtime: + type: Workflow + params: + name: str + jobs: + first-job: + stages: + - name: "Trigger itself" + trigger: ${{ params.name }} + """, + ): + workflow = Workflow.from_conf(name="wf-circle-runtime") + rs: Result = workflow.execute({"name": "wf-circle-runtime"}) + assert rs.status == FAILED + assert rs.context == { + "params": {"name": "wf-circle-runtime"}, + "jobs": { + "first-job": { + "status": FAILED, + "stages": { + "1099837090": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'Trigger itself', failed.", + }, + } + }, + "status": FAILED, + "errors": { + "name": "WorkflowError", + "message": "Job execution, 'first-job', was failed.", + }, + } + + with dump_yaml_context( + test_path / "conf/demo/01_99_wf_test_wf_exec_circle_runtime_nested.yml", + data=""" + wf-main: + type: Workflow + params: + name: str + jobs: + first-job: + stages: + - name: "Trigger itself" + trigger: wf-circle-runtime-nested + params: + name: ${{ params.name }} + + wf-circle-runtime-nested: + type: Workflow + params: + name: str + jobs: + first-job: + stages: + - name: "Trigger itself" + trigger: ${{ params.name }} + """, + ): + workflow = Workflow.from_conf(name="wf-main") + rs: Result = workflow.execute({"name": "wf-circle-runtime-nested"}) + assert rs.status == FAILED + assert rs.context == { + "params": {"name": "wf-circle-runtime-nested"}, + "jobs": { + "first-job": { + "status": FAILED, + "stages": { + "1099837090": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'Trigger itself', failed.", + }, + } + }, + "status": FAILED, + "errors": { + "name": "WorkflowError", + "message": "Job execution, 'first-job', was failed.", + }, + } From 52cca0fab9ae58c00a3a7e6816bc172749b3b95d Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 11:18:58 +0700 Subject: [PATCH 03/10] :gear: fixed: remove message from trigger catch error. --- src/ddeutil/workflow/stages.py | 4 ++-- tests/stages/test_stage_trigger.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ddeutil/workflow/stages.py b/src/ddeutil/workflow/stages.py index ae3cb0ba..43b5249a 100644 --- a/src/ddeutil/workflow/stages.py +++ b/src/ddeutil/workflow/stages.py @@ -2020,8 +2020,8 @@ def process( ) if result.status == FAILED: err_msg: str = ( - f" with:\n{msg}" - if (msg := result.context.get("errors", {}).get("message")) + f" with: {name}" + if (name := result.context.get("errors", {}).get("name")) else "." ) raise StageError(f"Trigger workflow was failed{err_msg}") diff --git a/tests/stages/test_stage_trigger.py b/tests/stages/test_stage_trigger.py index d8c7dbdb..4a5981b6 100644 --- a/tests/stages/test_stage_trigger.py +++ b/tests/stages/test_stage_trigger.py @@ -78,7 +78,7 @@ def test_trigger_stage_exec_raise(test_path): "params": {}, } ) - rs: Result = stage.execute(params={}) + rs: Result = stage.execute(params={}, run_id="01") assert rs.status == FAILED assert rs.context == { "status": FAILED, @@ -99,7 +99,7 @@ def test_trigger_stage_exec_raise(test_path): "params": {}, } ) - rs: Result = stage.execute(params={}) + rs: Result = stage.execute(params={}, run_id="02") assert rs.status == FAILED assert rs.context == { "status": FAILED, From 64aa5885138539508859fa71fda788879371afc2 Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 12:00:08 +0700 Subject: [PATCH 04/10] :dart: feat: add nested stage error object for dedup trace message. --- src/ddeutil/workflow/__init__.py | 3 ++ src/ddeutil/workflow/errors.py | 9 ++++++ src/ddeutil/workflow/result.py | 16 ++++++++-- src/ddeutil/workflow/stages.py | 30 +++++++++++++------ .../demo/01_99_wf_test_wf_exec_circle.yml | 7 +++++ tests/example/test_stage_foreach.py | 17 +++++++++-- tests/example/test_stage_parallel.py | 6 ++-- tests/stages/test_stage_trigger.py | 4 +-- tests/test_workflow_exec.py | 2 +- 9 files changed, 74 insertions(+), 20 deletions(-) create mode 100644 tests/conf/demo/01_99_wf_test_wf_exec_circle.yml diff --git a/src/ddeutil/workflow/__init__.py b/src/ddeutil/workflow/__init__.py index feef12ae..2122c3db 100644 --- a/src/ddeutil/workflow/__init__.py +++ b/src/ddeutil/workflow/__init__.py @@ -63,6 +63,9 @@ ResultError, StageCancelError, StageError, + StageNestedCancelError, + StageNestedError, + StageNestedSkipError, StageSkipError, UtilError, WorkflowCancelError, diff --git a/src/ddeutil/workflow/errors.py b/src/ddeutil/workflow/errors.py index adfd6581..42fdd6b0 100644 --- a/src/ddeutil/workflow/errors.py +++ b/src/ddeutil/workflow/errors.py @@ -166,6 +166,15 @@ class StageCancelError(StageError): ... class StageSkipError(StageError): ... +class StageNestedError(StageError): ... + + +class StageNestedCancelError(StageNestedError): ... + + +class StageNestedSkipError(StageNestedError): ... + + class JobError(BaseError): ... diff --git a/src/ddeutil/workflow/result.py b/src/ddeutil/workflow/result.py index 1938954b..d9e3ecf5 100644 --- a/src/ddeutil/workflow/result.py +++ b/src/ddeutil/workflow/result.py @@ -34,6 +34,9 @@ JobSkipError, StageCancelError, StageError, + StageNestedCancelError, + StageNestedError, + StageNestedSkipError, StageSkipError, WorkflowCancelError, WorkflowError, @@ -140,6 +143,9 @@ def get_status_from_error( StageError, StageCancelError, StageSkipError, + StageNestedCancelError, + StageNestedError, + StageNestedSkipError, JobError, JobCancelError, JobSkipError, @@ -157,10 +163,16 @@ def get_status_from_error( Returns: Status: The status from the specific exception class. """ - if isinstance(error, (StageSkipError, JobSkipError)): + if isinstance(error, (StageNestedSkipError, StageSkipError, JobSkipError)): return SKIP elif isinstance( - error, (StageCancelError, JobCancelError, WorkflowCancelError) + error, + ( + StageNestedCancelError, + StageCancelError, + JobCancelError, + WorkflowCancelError, + ), ): return CANCEL return FAILED diff --git a/src/ddeutil/workflow/stages.py b/src/ddeutil/workflow/stages.py index 43b5249a..233a0d35 100644 --- a/src/ddeutil/workflow/stages.py +++ b/src/ddeutil/workflow/stages.py @@ -84,7 +84,15 @@ from .__types import DictData, DictStr, StrOrInt, StrOrNone, TupleStr from .conf import dynamic, pass_env -from .errors import StageCancelError, StageError, StageSkipError, to_dict +from .errors import ( + StageCancelError, + StageError, + StageNestedCancelError, + StageNestedError, + StageNestedSkipError, + StageSkipError, + to_dict, +) from .result import ( CANCEL, FAILED, @@ -339,10 +347,14 @@ def execute( # this exception class at other location. except ( StageSkipError, - StageCancelError, + StageNestedSkipError, + StageNestedError, StageError, ) as e: # pragma: no cov - trace.info(f"[STAGE]: Handler:||{traceback.format_exc()}") + if isinstance(e, StageNestedError): + trace.info(f"[STAGE]: Handler: {e}") + else: + trace.info(f"[STAGE]: Handler:||{traceback.format_exc()}") st: Status = get_status_from_error(e) return Result( run_id=run_id, @@ -353,7 +365,7 @@ def execute( status=st, updated=( None - if isinstance(e, StageSkipError) + if isinstance(e, (StageSkipError, StageNestedSkipError)) else {"errors": e.to_dict()} ), ), @@ -2020,15 +2032,15 @@ def process( ) if result.status == FAILED: err_msg: str = ( - f" with: {name}" - if (name := result.context.get("errors", {}).get("name")) + f" with:\n{msg}" + if (msg := result.context.get("errors", {}).get("message")) else "." ) - raise StageError(f"Trigger workflow was failed{err_msg}") + raise StageNestedError(f"Trigger workflow was failed{err_msg}") elif result.status == CANCEL: - raise StageCancelError("Trigger workflow was cancel.") + raise StageNestedCancelError("Trigger workflow was cancel.") elif result.status == SKIP: - raise StageSkipError("Trigger workflow was skipped.") + raise StageNestedSkipError("Trigger workflow was skipped.") return result diff --git a/tests/conf/demo/01_99_wf_test_wf_exec_circle.yml b/tests/conf/demo/01_99_wf_test_wf_exec_circle.yml new file mode 100644 index 00000000..d68262b5 --- /dev/null +++ b/tests/conf/demo/01_99_wf_test_wf_exec_circle.yml @@ -0,0 +1,7 @@ +wf-circle: + type: Workflow + jobs: + first-job: + stages: + - name: "Trigger itself" + trigger: wf-circle diff --git a/tests/example/test_stage_foreach.py b/tests/example/test_stage_foreach.py index a87272aa..86d7896f 100644 --- a/tests/example/test_stage_foreach.py +++ b/tests/example/test_stage_foreach.py @@ -129,6 +129,7 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): stage: Stage = workflow.job("first-job").stage("foreach-raise") rs: Result = stage.execute({}) assert rs.status == FAILED + assert_status: list[bool] = [] try: assert rs.context == { "status": FAILED, @@ -141,7 +142,7 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): "2827845371": { "outputs": {}, "errors": { - "name": "StageError", + "name": "StageNestedError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, @@ -159,7 +160,7 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): "2827845371": { "outputs": {}, "errors": { - "name": "StageCancelError", + "name": "StageNestedCancelError", "message": "Trigger workflow was cancel.", }, "status": CANCEL, @@ -182,7 +183,11 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): }, }, } + assert_status.append(True) except AssertionError: + assert_status.append(False) + + try: assert rs.context == { "status": FAILED, "items": [1, 2], @@ -194,7 +199,7 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): "2827845371": { "outputs": {}, "errors": { - "name": "StageError", + "name": "StageNestedError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, @@ -226,6 +231,12 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): }, }, } + assert_status.append(True) + except AssertionError: + assert_status.append(False) + + assert len(assert_status) == 2 + assert any(assert_status), f"{rs.context}" def test_example_foreach_stage_exec_nested_foreach_and_trigger(test_path): diff --git a/tests/example/test_stage_parallel.py b/tests/example/test_stage_parallel.py index 9d6a8f24..5f1db2ab 100644 --- a/tests/example/test_stage_parallel.py +++ b/tests/example/test_stage_parallel.py @@ -137,7 +137,7 @@ def test_example_parallel_stage_exec_with_trigger_raise(test_path): "8713259197": { "outputs": {}, "errors": { - "name": "StageError", + "name": "StageNestedError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, @@ -213,7 +213,7 @@ def test_example_parallel_stage_exec_with_trigger_raise_bug(test_path): "2579951921": { "outputs": {}, "errors": { - "name": "StageError", + "name": "StageNestedError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, @@ -231,7 +231,7 @@ def test_example_parallel_stage_exec_with_trigger_raise_bug(test_path): "4773288548": { "outputs": {}, "errors": { - "name": "StageError", + "name": "StageNestedError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, diff --git a/tests/stages/test_stage_trigger.py b/tests/stages/test_stage_trigger.py index 4a5981b6..a409a9a9 100644 --- a/tests/stages/test_stage_trigger.py +++ b/tests/stages/test_stage_trigger.py @@ -104,7 +104,7 @@ def test_trigger_stage_exec_raise(test_path): assert rs.context == { "status": FAILED, "errors": { - "name": "StageError", + "name": "StageNestedError", "message": ( "Trigger workflow was failed with:\n" "Job execution, 'first-job', was failed." @@ -127,7 +127,7 @@ def test_trigger_stage_exec_cancel(): assert rs.context == { "status": CANCEL, "errors": { - "name": "StageCancelError", + "name": "StageNestedCancelError", "message": "Trigger workflow was cancel.", }, } diff --git a/tests/test_workflow_exec.py b/tests/test_workflow_exec.py index 76ae7c62..7d42b713 100644 --- a/tests/test_workflow_exec.py +++ b/tests/test_workflow_exec.py @@ -1112,7 +1112,6 @@ def test_workflow_exec_raise_from_job_exec(): assert rs.status == FAILED -@pytest.mark.skip def test_workflow_exec_raise_job_trigger(test_path): with dump_yaml_context( test_path / "conf/demo/01_99_wf_test_wf_exec_raise_job_trigger.yml", @@ -1172,6 +1171,7 @@ def test_workflow_exec_raise_job_trigger(test_path): } +@pytest.mark.skip def test_workflow_exec_circle_trigger(test_path): with dump_yaml_context( test_path / "conf/demo/01_99_wf_test_wf_exec_circle.yml", From f8b5aa88e365deec4e67988d5615cb31d0a4f2ef Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 14:55:07 +0700 Subject: [PATCH 05/10] :lipstick: styled: upgrade nested prefix and emoji. --- src/ddeutil/workflow/__about__.py | 1 + src/ddeutil/workflow/stages.py | 76 ++++++++++++++++++------------- src/ddeutil/workflow/traces.py | 5 +- src/ddeutil/workflow/utils.py | 9 ++-- tests/stages/test_stage_case.py | 20 ++++---- tests/test_traces.py | 2 +- tests/test_utils.py | 4 +- 7 files changed, 70 insertions(+), 47 deletions(-) diff --git a/src/ddeutil/workflow/__about__.py b/src/ddeutil/workflow/__about__.py index 668d36a8..00aa5977 100644 --- a/src/ddeutil/workflow/__about__.py +++ b/src/ddeutil/workflow/__about__.py @@ -1 +1,2 @@ __version__: str = "0.0.81" +__python_version__: str = "3.9" diff --git a/src/ddeutil/workflow/stages.py b/src/ddeutil/workflow/stages.py index 233a0d35..0c7b62d6 100644 --- a/src/ddeutil/workflow/stages.py +++ b/src/ddeutil/workflow/stages.py @@ -82,6 +82,7 @@ from pydantic.functional_validators import field_validator, model_validator from typing_extensions import Self +from .__about__ import __python_version__ from .__types import DictData, DictStr, StrOrInt, StrOrNone, TupleStr from .conf import dynamic, pass_env from .errors import ( @@ -300,7 +301,7 @@ def execute( """ ts: float = time.monotonic() parent_run_id: str = run_id - run_id: str = run_id or gen_id(self.iden, unique=True) + run_id: str = gen_id(self.iden, unique=True, extras=self.extras) context: DictData = {"status": WAIT} trace: TraceManager = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras @@ -354,7 +355,7 @@ def execute( if isinstance(e, StageNestedError): trace.info(f"[STAGE]: Handler: {e}") else: - trace.info(f"[STAGE]: Handler:||{traceback.format_exc()}") + trace.info(f"[STAGE]: Handler:||🚨 {traceback.format_exc()}") st: Status = get_status_from_error(e) return Result( run_id=run_id, @@ -373,7 +374,7 @@ def execute( extras=self.extras, ) except Exception as e: - trace.error(f"[STAGE]: Error Handler:||{traceback.format_exc()}") + trace.error(f"[STAGE]: Error Handler:||🚨 {traceback.format_exc()}") return Result( run_id=run_id, parent_run_id=parent_run_id, @@ -628,7 +629,7 @@ async def axecute( """ ts: float = time.monotonic() parent_run_id: StrOrNone = run_id - run_id: str = run_id or gen_id(self.iden, unique=True) + run_id: str = gen_id(self.iden, unique=True, extras=self.extras) context: DictData = {} trace: TraceManager = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras @@ -673,12 +674,16 @@ async def axecute( # this exception class at other location. except ( StageSkipError, - StageCancelError, + StageNestedSkipError, + StageNestedError, StageError, ) as e: # pragma: no cov - await trace.ainfo( - f"[STAGE]: Skip Handler:||{traceback.format_exc()}" - ) + if isinstance(e, StageNestedError): + await trace.ainfo(f"[STAGE]: Handler: {e}") + else: + await trace.ainfo( + f"[STAGE]:Handler:||🚨 {traceback.format_exc()}" + ) st: Status = get_status_from_error(e) return Result( run_id=run_id, @@ -698,7 +703,7 @@ async def axecute( ) except Exception as e: await trace.aerror( - f"[STAGE]: Error Handler:||{traceback.format_exc()}" + f"[STAGE]: Error Handler:||🚨 {traceback.format_exc()}" ) return Result( run_id=run_id, @@ -2011,9 +2016,9 @@ def process( _trigger: str = param2template(self.trigger, params, extras=self.extras) # if _trigger in self.extras.get("stop_circle_workflow_name", []): # raise StageError( - # "[STAGE]: Circle execution via trigger itself workflow name." + # "[NESTED]: Circle execution via trigger itself workflow name." # ) - trace.info(f"[STAGE]: Load workflow: {_trigger!r}") + trace.info(f"[NESTED]: Load Workflow Config: {_trigger!r}") # # NOTE: add noted key for cancel circle execution. # if "stop_circle_workflow_name" in self.extras: @@ -2127,7 +2132,7 @@ def _process_branch( trace: TraceManager = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) - trace.debug(f"[STAGE]: Execute Branch: {branch!r}") + trace.debug(f"[NESTED]: Execute Branch: {branch!r}") # NOTE: Create nested-context current_context: DictData = copy.deepcopy(params) @@ -2258,7 +2263,7 @@ def process( run_id, parent_run_id=parent_run_id, extras=self.extras ) event: Event = event or Event() - trace.info(f"[STAGE]: Parallel with {self.max_workers} workers.") + trace.info(f"[NESTED]: Parallel with {self.max_workers} workers.") catch( context=context, status=WAIT, @@ -2396,7 +2401,7 @@ def _process_item( trace: TraceManager = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) - trace.debug(f"[STAGE]: Execute Item: {item!r}") + trace.debug(f"[NESTED]: Execute Item: {item!r}") key: StrOrInt = index if self.use_index_as_key else item # NOTE: Create nested-context data from the passing context. @@ -2452,7 +2457,7 @@ def _process_item( f"Item execution was break because its nested-stage, " f"{stage.iden!r}, failed." ) - trace.warning(f"[STAGE]: {error_msg}") + trace.warning(f"[NESTED]: {error_msg}") catch( context=context, status=FAILED, @@ -2562,7 +2567,7 @@ def process( "duplicate item, it should set `use_index_as_key: true`." ) - trace.info(f"[STAGE]: Foreach: {foreach!r}.") + trace.info(f"[NESTED]: Foreach: {foreach!r}.") catch( context=context, status=WAIT, @@ -2596,7 +2601,7 @@ def process( done, not_done = wait(futures, return_when=FIRST_EXCEPTION) if len(list(done)) != len(futures): trace.warning( - "[STAGE]: Set the event for stop pending for-each stage." + "[NESTED]: Set the event for stop pending for-each stage." ) event.set() for future in not_done: @@ -2611,7 +2616,7 @@ def process( if not_done else "" ) - trace.debug(f"[STAGE]: ... Foreach-Stage set failed event{nd}") + trace.debug(f"[NESTED]: ... Foreach-Stage set failed event{nd}") done: Iterator[Future] = as_completed(futures) fail_fast = True @@ -2718,7 +2723,7 @@ def _process_loop( trace: TraceManager = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) - trace.debug(f"[STAGE]: Execute Loop: {loop} (Item {item!r})") + trace.debug(f"[NESTED]: Execute Loop: {loop} (Item {item!r})") # NOTE: Create nested-context current_context: DictData = copy.deepcopy(params) @@ -2861,7 +2866,7 @@ def process( run_id, parent_run_id=parent_run_id, extras=self.extras ) event: Event = event or Event() - trace.info(f"[STAGE]: Until: {self.until!r}") + trace.info(f"[NESTED]: Until: {self.until!r}") item: Union[str, int, bool] = pass_env( param2template(self.item, params, extras=self.extras) ) @@ -2891,7 +2896,7 @@ def process( if item is None: item: int = loop trace.warning( - f"[STAGE]: Return loop not set the item. It uses loop: " + f"[NESTED]: Return loop not set the item. It uses loop: " f"{loop} by default." ) @@ -3012,7 +3017,7 @@ def _process_case( trace: TraceManager = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) - trace.debug(f"[STAGE]: Execute Case: {case!r}") + trace.debug(f"[NESTED]: Execute Case: {case!r}") current_context: DictData = copy.deepcopy(params) current_context.update({"case": case}) output: DictData = {"case": case, "stages": {}} @@ -3095,18 +3100,22 @@ def process( ) _case: StrOrNone = param2template(self.case, params, extras=self.extras) + trace.info(f"[NESTED]: Get Case: {_case!r}.") - trace.info(f"[STAGE]: Case: {_case!r}.") _else: Optional[Match] = None stages: Optional[list[Stage]] = None + + # NOTE: Start check the condition of each stage match with this case. for match in self.match: + # NOTE: Store the else case. if (c := match.case) == "_": _else: Match = match continue _condition: str = param2template(c, params, extras=self.extras) - if stages is None and pass_env(_case) == pass_env(_condition): + if pass_env(_case) == pass_env(_condition): stages: list[Stage] = match.stages + break if stages is None: if _else is None: @@ -3120,6 +3129,7 @@ def process( "case and the else condition does not set too." ) + # NOTE: Force to use the else when it does not match any case. _case: str = "_" stages: list[Stage] = _else.stages @@ -3413,8 +3423,11 @@ class VirtualPyStage(PyStage): # pragma: no cov """ version: str = Field( - default="3.9", - description="A Python version that want to run.", + default=__python_version__, + description=( + "A Python version that want to run. It will use supported version " + f"of this package by default, {__python_version__}." + ), ) deps: list[str] = Field( description=( @@ -3437,11 +3450,12 @@ def create_py_file( The format of Python dependency was followed by the `uv` recommended. - :param py: A Python string statement. - :param values: A variable that want to set before running this - :param deps: An additional Python dependencies that want install before - run this python stage. - :param run_id: (StrOrNone) A running ID of this stage execution. + Args: + py: A Python string statement. + values: A variable that want to set before running this + deps: An additional Python dependencies that want install before + run this python stage. + run_id: (StrOrNone) A running ID of this stage execution. """ run_id: str = run_id or uuid.uuid4() f_name: str = f"{run_id}.py" diff --git a/src/ddeutil/workflow/traces.py b/src/ddeutil/workflow/traces.py index ec3de79d..10483d1a 100644 --- a/src/ddeutil/workflow/traces.py +++ b/src/ddeutil/workflow/traces.py @@ -86,9 +86,10 @@ def set_logging(name: str) -> logging.Logger: "emoji": "⚙️", "desc": "logs from any usage from custom caller function.", }, + "NESTED": {"emoji": "⛓️", "desc": "logs from stages module."}, "STAGE": {"emoji": "🔗", "desc": "logs from stages module."}, - "JOB": {"emoji": "⛓️", "desc": "logs from job module."}, - "WORKFLOW": {"emoji": "🏃", "desc": "logs from workflow module."}, + "JOB": {"emoji": "🏗", "desc": "logs from job module."}, + "WORKFLOW": {"emoji": "👟", "desc": "logs from workflow module."}, "RELEASE": {"emoji": "📅", "desc": "logs from release workflow method."}, "POKING": {"emoji": "⏰", "desc": "logs from poke workflow method."}, "AUDIT": {"emoji": "📌", "desc": "logs from audit model."}, diff --git a/src/ddeutil/workflow/utils.py b/src/ddeutil/workflow/utils.py index ac61a3ca..f20901a7 100644 --- a/src/ddeutil/workflow/utils.py +++ b/src/ddeutil/workflow/utils.py @@ -218,7 +218,10 @@ def gen_id( hashing value length to 10 if simple mode is enabled. Simple Mode Format: - YYYYMMDDHHMMSSffffffTxxxxxxxxxx + + The format of ID include full datetime and hashing identity. + + YYYY MM DD HH MM SS ffffff T ********** year month day hour minute second microsecond sep simple-id Args: @@ -318,7 +321,7 @@ def cross_product(matrix: Matrix) -> Iterator[DictData]: ) -def cut_id(run_id: str, *, num: int = 6) -> str: +def cut_id(run_id: str, *, num: int = 8) -> str: """Cut running ID to specified length. Example: @@ -334,7 +337,7 @@ def cut_id(run_id: str, *, num: int = 6) -> str: """ if "T" in run_id: dt, simple = run_id.split("T", maxsplit=1) - return dt[:12] + simple[-num:] + return dt[10:20] + simple[-num:] return run_id[-num:] diff --git a/tests/stages/test_stage_case.py b/tests/stages/test_stage_case.py index 16949834..710b46ae 100644 --- a/tests/stages/test_stage_case.py +++ b/tests/stages/test_stage_case.py @@ -38,7 +38,7 @@ def test_case_stage_exec(test_path): ], }, ) - rs: Result = stage.execute({"params": {"name": "bar"}}) + rs: Result = stage.execute({"params": {"name": "bar"}}, run_id="01") assert rs.status == SUCCESS assert rs.context == { "status": SUCCESS, @@ -61,7 +61,7 @@ def test_case_stage_exec(test_path): } } - rs: Result = stage.execute({"params": {"name": "foo"}}) + rs: Result = stage.execute({"params": {"name": "foo"}}, run_id="02") assert rs.status == SUCCESS assert rs.context == { "status": SUCCESS, @@ -84,7 +84,7 @@ def test_case_stage_exec(test_path): } } - rs: Result = stage.execute({"params": {"name": "test"}}) + rs: Result = stage.execute({"params": {"name": "test"}}, run_id="03") assert rs.status == SUCCESS assert rs.context == { "status": SUCCESS, @@ -114,7 +114,7 @@ def test_case_stage_exec_raise(): } ) # NOTE: Raise because else condition does not set. - rs: Result = stage.execute({"params": {"name": "test"}}) + rs: Result = stage.execute({"params": {"name": "test"}}, run_id="01") assert rs.status == FAILED assert rs.context == { "status": FAILED, @@ -145,7 +145,7 @@ def test_case_stage_exec_raise(): "extras": {"foo": "bar"}, } ) - rs: Result = stage.execute({"params": {"name": "bar"}}) + rs: Result = stage.execute({"params": {"name": "bar"}}, run_id="02") assert rs.status == FAILED assert rs.context == { "status": FAILED, @@ -184,7 +184,9 @@ def test_case_stage_exec_cancel(): } ) event = MockEvent(n=0) - rs: Result = stage.execute({"params": {"name": "bar"}}, event=event) + rs: Result = stage.execute( + {"params": {"name": "bar"}}, event=event, run_id="03" + ) assert rs.status == CANCEL assert rs.context == { "status": CANCEL, @@ -197,7 +199,9 @@ def test_case_stage_exec_cancel(): } event = MockEvent(n=1) - rs: Result = stage.execute({"params": {"name": "bar"}}, event=event) + rs: Result = stage.execute( + {"params": {"name": "bar"}}, event=event, run_id="04" + ) assert rs.status == CANCEL assert rs.context == { "status": CANCEL, @@ -230,6 +234,6 @@ def test_case_stage_exec_skipped(): ], } ) - rs: Result = stage.execute({"params": {"name": "test"}}) + rs: Result = stage.execute({"params": {"name": "test"}}, run_id="01") assert rs.status == SKIP assert rs.context == {"status": SKIP} diff --git a/tests/test_traces.py b/tests/test_traces.py index 4e68e278..de6ba1ee 100644 --- a/tests/test_traces.py +++ b/tests/test_traces.py @@ -67,7 +67,7 @@ def test_trace_regex_message(): "( End trigger Priority Group: 2 )" ) assert prefix.prepare() == ( - "🏃 [WORKFLOW]: Execute Empty-Stage:\n'End trigger Priority Group':\n" + "👟 [WORKFLOW]: Execute Empty-Stage:\n'End trigger Priority Group':\n" "( End trigger Priority Group: 2 )" ) assert prefix.prepare(extras={"log_add_emoji": False}) == ( diff --git a/tests/test_utils.py b/tests/test_utils.py index cb53210d..3bdb5854 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -104,9 +104,9 @@ def test_make_exec(): def test_cut_id(): assert ( - cut_id(run_id="20240101081330000000T1354680202") == "202401010813680202" + cut_id(run_id="20240101081330000000T1354680202") == "133000000054680202" ) - assert cut_id(run_id="3509917790201200503600070303500") == "303500" + assert cut_id(run_id="3509917790201200503600070303500") == "70303500" @freeze_time("2024-01-01 01:13:30") From c0ba849dbe3c6d9ef79bd04eaf8cd3b5b9de01c7 Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 15:32:53 +0700 Subject: [PATCH 06/10] :lipstick: styled: add nested exception on until stage. --- src/ddeutil/workflow/stages.py | 26 ++++++++++++++++++++------ tests/stages/test_stage_py_virtual.py | 3 ++- tests/stages/test_stage_until.py | 26 +++++++++++++------------- tests/test_traces.py | 7 +++++-- 4 files changed, 40 insertions(+), 22 deletions(-) diff --git a/src/ddeutil/workflow/stages.py b/src/ddeutil/workflow/stages.py index 0c7b62d6..b34ab1df 100644 --- a/src/ddeutil/workflow/stages.py +++ b/src/ddeutil/workflow/stages.py @@ -355,7 +355,14 @@ def execute( if isinstance(e, StageNestedError): trace.info(f"[STAGE]: Handler: {e}") else: - trace.info(f"[STAGE]: Handler:||🚨 {traceback.format_exc()}") + emoji: str = ( + "⏭️" + if isinstance(e, (StageSkipError, StageNestedSkipError)) + else "🚨" + ) + trace.info( + f"[STAGE]: Handler:||{emoji} {traceback.format_exc()}" + ) st: Status = get_status_from_error(e) return Result( run_id=run_id, @@ -681,8 +688,13 @@ async def axecute( if isinstance(e, StageNestedError): await trace.ainfo(f"[STAGE]: Handler: {e}") else: + emoji: str = ( + "⏭️" + if isinstance(e, (StageSkipError, StageNestedSkipError)) + else "🚨" + ) await trace.ainfo( - f"[STAGE]:Handler:||🚨 {traceback.format_exc()}" + f"[STAGE]:Handler:||{emoji} {traceback.format_exc()}" ) st: Status = get_status_from_error(e) return Result( @@ -2792,11 +2804,11 @@ def _process_loop( "stages": filter_func( nestet_context.pop("stages", {}) ), - "errors": StageError(error_msg).to_dict(), + "errors": StageNestedError(error_msg).to_dict(), } }, ) - raise StageError(error_msg, refs=loop) + raise StageNestedError(error_msg, refs=loop) elif rs.status == CANCEL: error_msg: str = ( @@ -2814,11 +2826,13 @@ def _process_loop( "stages": filter_func( nestet_context.pop("stages", {}) ), - "errors": StageCancelError(error_msg).to_dict(), + "errors": StageNestedCancelError( + error_msg + ).to_dict(), } }, ) - raise StageCancelError(error_msg, refs=loop) + raise StageNestedCancelError(error_msg, refs=loop) status: Status = SKIP if sum(skips) == total_stage else SUCCESS return ( diff --git a/tests/stages/test_stage_py_virtual.py b/tests/stages/test_stage_py_virtual.py index 4bcf0546..c6bd7170 100644 --- a/tests/stages/test_stage_py_virtual.py +++ b/tests/stages/test_stage_py_virtual.py @@ -1,4 +1,4 @@ -from ddeutil.workflow import Result, Stage, StageError, Workflow +from ddeutil.workflow import SUCCESS, Result, Stage, StageError, Workflow from ..utils import dump_yaml_context @@ -35,6 +35,7 @@ def test_stage_py_virtual(test_path): assert output == { "stages": { "py-virtual": { + "status": SUCCESS, "outputs": { "return_code": 0, "stdout": "[1 2 3 4 5]\n", diff --git a/tests/stages/test_stage_until.py b/tests/stages/test_stage_until.py index 2ee66447..209effda 100644 --- a/tests/stages/test_stage_until.py +++ b/tests/stages/test_stage_until.py @@ -29,7 +29,7 @@ def test_until_stage(): ) assert stage.is_nested - rs: Result = stage.execute(params={}) + rs: Result = stage.execute(params={}, run_id="01") assert rs.status == SUCCESS assert rs.context == { "status": SUCCESS, @@ -64,7 +64,7 @@ def test_until_stage_raise(): "extras": {"foo": "bar"}, } ) - rs: Result = stage.execute(params={}) + rs: Result = stage.execute(params={}, run_id="02") assert rs.status == FAILED assert rs.context == { "status": FAILED, @@ -84,13 +84,13 @@ def test_until_stage_raise(): } }, "errors": { - "name": "StageError", + "name": "StageNestedError", "message": "Loop execution was break because its nested-stage, 'Raise stage nested', failed.", }, } }, "errors": { - "name": "StageError", + "name": "StageNestedError", "message": "Loop execution was break because its nested-stage, 'Raise stage nested', failed.", }, } @@ -106,7 +106,7 @@ def test_until_stage_raise(): "extras": {"foo": "bar"}, } ) - rs: Result = stage.execute(params={}) + rs: Result = stage.execute(params={}, run_id="03") assert rs.status == FAILED assert rs.context == { "status": FAILED, @@ -142,7 +142,7 @@ def test_until_stage_skipped(): ], } ) - rs: Result = stage.execute(params={}) + rs: Result = stage.execute(params={}, run_id="01") assert rs.status == SKIP assert rs.context == { "status": SKIP, @@ -178,7 +178,7 @@ def test_until_stage_cancel(): ) event = Event() event.set() - rs: Result = stage.execute(params={}, event=event) + rs: Result = stage.execute(params={}, event=event, run_id="02") assert rs.status == CANCEL assert rs.context == { "status": CANCEL, @@ -190,7 +190,7 @@ def test_until_stage_cancel(): } event = MockEvent(n=1) - rs: Result = stage.execute(params={}, event=event) + rs: Result = stage.execute(params={}, event=event, run_id="03") assert rs.status == CANCEL assert rs.context == { "status": CANCEL, @@ -213,7 +213,7 @@ def test_until_stage_cancel(): } event = MockEvent(n=2) - rs: Result = stage.execute(params={}, event=event) + rs: Result = stage.execute(params={}, event=event, run_id="04") assert rs.status == CANCEL assert rs.context == { "status": CANCEL, @@ -233,13 +233,13 @@ def test_until_stage_cancel(): } }, "errors": { - "name": "StageCancelError", + "name": "StageNestedCancelError", "message": "Loop execution was canceled from the event after end loop execution.", }, } }, "errors": { - "name": "StageCancelError", + "name": "StageNestedCancelError", "message": "Loop execution was canceled from the event after end loop execution.", }, } @@ -257,7 +257,7 @@ def test_until_stage_exec_exceed_loop(): ], } ) - rs: Result = stage.execute(params={}) + rs: Result = stage.execute(params={}, run_id="01") assert rs.status == FAILED assert rs.context == { "status": FAILED, @@ -306,7 +306,7 @@ def test_until_stage_exec_full(test_path): ): workflow = Workflow.from_conf(name="tmp-wf-until") stage: Stage = workflow.job("first-job").stage("until-stage") - rs: Result = stage.execute(params={}) + rs: Result = stage.execute(params={}, run_id="02") assert rs.status == SUCCESS assert rs.context == { "status": SUCCESS, diff --git a/tests/test_traces.py b/tests/test_traces.py index de6ba1ee..6f498192 100644 --- a/tests/test_traces.py +++ b/tests/test_traces.py @@ -130,10 +130,13 @@ def test_result_trace(): "logs_trace_frame_layer": 4, }, ) - print(rs.trace.extras) + assert rs.trace.extras == { + "enable_write_log": True, + "logs_trace_frame_layer": 4, + } rs.trace.info("[DEMO]: Test echo log from result trace argument!!!") print(rs.run_id) - print(rs.parent_run_id) + assert rs.parent_run_id == "foo_id_for_writing_log" def test_file_trace_find_traces(test_path): From 0a0119a1074cc082b7ab7ebfc6d1a44a227158e9 Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 18:19:14 +0700 Subject: [PATCH 07/10] :dart: feat: add pre-process on trace model. --- docs/api/traces.md | 26 +++++----- src/ddeutil/workflow/__init__.py | 2 +- src/ddeutil/workflow/api/routes/job.py | 4 +- src/ddeutil/workflow/audits.py | 6 +-- src/ddeutil/workflow/job.py | 14 ++--- src/ddeutil/workflow/result.py | 10 ++-- src/ddeutil/workflow/stages.py | 56 ++++++++++---------- src/ddeutil/workflow/traces.py | 71 ++++++++++++++++++-------- src/ddeutil/workflow/workflow.py | 18 ++++--- tests/test_traces.py | 23 +++++---- 10 files changed, 132 insertions(+), 98 deletions(-) diff --git a/docs/api/traces.md b/docs/api/traces.md index 81332927..ab584333 100644 --- a/docs/api/traces.md +++ b/docs/api/traces.md @@ -14,7 +14,7 @@ The traces system provides: ## Core Components -### `TraceManager` +### `Trace` The main trace manager that coordinates multiple handlers and provides a unified logging interface. @@ -64,10 +64,10 @@ Basic console logging implementation that outputs to stdout/stderr. !!! example "Console Handler" ```python - from ddeutil.workflow.traces import ConsoleHandler, TraceManager + from ddeutil.workflow.traces import ConsoleHandler, Trace handler = ConsoleHandler(type="console") - trace = TraceManager( + trace = Trace( run_id="workflow-123", handlers=[handler] ) @@ -81,7 +81,7 @@ File-based trace implementation that persists logs to the local filesystem with !!! example "File Handler Usage" ```python - from ddeutil.workflow.traces import FileHandler, TraceManager + from ddeutil.workflow.traces import FileHandler, Trace # Create file handler handler = FileHandler( @@ -90,7 +90,7 @@ File-based trace implementation that persists logs to the local filesystem with format="{datetime} ({process:5d}, {thread:5d}) ({cut_id}) {message:120s} ({filename}:{lineno})" ) - trace = TraceManager( + trace = Trace( run_id="workflow-123", parent_run_id="parent-456", handlers=[handler] @@ -137,7 +137,7 @@ SQLite-based trace implementation for scalable logging with structured metadata !!! example "SQLite Handler" ```python - from ddeutil.workflow.traces import SQLiteHandler, TraceManager + from ddeutil.workflow.traces import SQLiteHandler, Trace handler = SQLiteHandler( type="sqlite", @@ -145,7 +145,7 @@ SQLite-based trace implementation for scalable logging with structured metadata table_name="traces" ) - trace = TraceManager( + trace = Trace( run_id="workflow-789", handlers=[handler] ) @@ -206,7 +206,7 @@ REST API integration for external logging services. !!! example "REST API Handler" ```python - from ddeutil.workflow.traces import RestAPIHandler, TraceManager + from ddeutil.workflow.traces import RestAPIHandler, Trace # Datadog integration handler = RestAPIHandler( @@ -218,7 +218,7 @@ REST API integration for external logging services. max_retries=3 ) - trace = TraceManager( + trace = Trace( run_id="workflow-123", handlers=[handler] ) @@ -238,7 +238,7 @@ High-performance Elasticsearch logging with bulk indexing and search capabilitie !!! example "Elasticsearch Handler" ```python - from ddeutil.workflow.traces import ElasticHandler, TraceManager + from ddeutil.workflow.traces import ElasticHandler, Trace handler = ElasticHandler( type="elastic", @@ -250,7 +250,7 @@ High-performance Elasticsearch logging with bulk indexing and search capabilitie max_retries=3 ) - trace = TraceManager( + trace = Trace( run_id="workflow-123", handlers=[handler] ) @@ -320,7 +320,7 @@ All handlers support asynchronous logging for non-blocking operations: ## Buffer Support -The `TraceManager` supports buffered logging for high-performance scenarios: +The `Trace` supports buffered logging for high-performance scenarios: !!! example "Buffered Logging" @@ -342,7 +342,7 @@ The `TraceManager` supports buffered logging for high-performance scenarios: ### `get_trace` -Factory function that returns a `TraceManager` instance with handlers configured from the core configuration. +Factory function that returns a `Trace` instance with handlers configured from the core configuration. !!! example "Dynamic Trace Creation" diff --git a/src/ddeutil/workflow/__init__.py b/src/ddeutil/workflow/__init__.py index 2122c3db..a80b12b0 100644 --- a/src/ddeutil/workflow/__init__.py +++ b/src/ddeutil/workflow/__init__.py @@ -135,7 +135,7 @@ VirtualPyStage, ) from .traces import ( - TraceManager, + Trace, get_trace, ) from .utils import * diff --git a/src/ddeutil/workflow/api/routes/job.py b/src/ddeutil/workflow/api/routes/job.py index 8c03676e..fd5b52de 100644 --- a/src/ddeutil/workflow/api/routes/job.py +++ b/src/ddeutil/workflow/api/routes/job.py @@ -15,7 +15,7 @@ from ...__types import DictData from ...errors import JobError from ...job import Job -from ...traces import TraceManager, get_trace +from ...traces import Trace, get_trace from ...utils import gen_id logger = logging.getLogger("uvicorn.error") @@ -41,7 +41,7 @@ async def job_execute( if extras: job.extras = extras - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=job.extras ) diff --git a/src/ddeutil/workflow/audits.py b/src/ddeutil/workflow/audits.py index df447f58..fc5e282a 100644 --- a/src/ddeutil/workflow/audits.py +++ b/src/ddeutil/workflow/audits.py @@ -59,7 +59,7 @@ from .__types import DictData from .conf import dynamic -from .traces import TraceManager, get_trace, set_logging +from .traces import Trace, get_trace, set_logging logger = logging.getLogger("ddeutil.workflow") @@ -365,7 +365,7 @@ def save(self, data: Any, excluded: Optional[list[str]] = None) -> Self: Self: The audit instance after saving. """ audit = AuditData.model_validate(data) - trace: TraceManager = get_trace( + trace: Trace = get_trace( audit.run_id, parent_run_id=audit.parent_run_id, extras=self.extras, @@ -655,7 +655,7 @@ def save(self, data: Any, excluded: Optional[list[str]] = None) -> Self: ValueError: If SQLite database is not properly configured. """ audit = AuditData.model_validate(data) - trace: TraceManager = get_trace( + trace: Trace = get_trace( audit.run_id, parent_run_id=audit.parent_run_id, extras=self.extras, diff --git a/src/ddeutil/workflow/job.py b/src/ddeutil/workflow/job.py index 84a65f7e..757b9885 100644 --- a/src/ddeutil/workflow/job.py +++ b/src/ddeutil/workflow/job.py @@ -72,7 +72,7 @@ ) from .reusables import has_template, param2template from .stages import Stage -from .traces import TraceManager, get_trace +from .traces import Trace, get_trace from .utils import cross_product, filter_func, gen_id MatrixFilter = list[dict[str, Union[str, int]]] @@ -892,7 +892,7 @@ def execute( ts: float = time.monotonic() parent_run_id: str = run_id run_id: str = gen_id((self.id or "EMPTY"), unique=True) - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) trace.info( @@ -1029,7 +1029,7 @@ def local_execute_strategy( :rtype: tuple[Status, DictData] """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=job.extras ) if strategy: @@ -1165,7 +1165,7 @@ def local_execute( ts: float = time.monotonic() parent_run_id: StrOrNone = run_id run_id: str = gen_id((job.id or "EMPTY"), unique=True) - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=job.extras ) context: DictData = {"status": WAIT} @@ -1232,7 +1232,7 @@ def local_execute( strategies: list[DictStr] = job.strategy.make() len_strategy: int = len(strategies) trace.info( - f"[JOB]: ... Mode {ls}: {job.id!r} with {workers} " + f"[JOB]: Mode {ls}: {job.id!r} with {workers} " f"worker{'s' if workers > 1 else ''}." ) @@ -1349,7 +1349,7 @@ def self_hosted_execute( """ parent_run_id: StrOrNone = run_id run_id: str = gen_id((job.id or "EMPTY"), unique=True) - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=job.extras ) context: DictData = {"status": WAIT} @@ -1432,7 +1432,7 @@ def docker_execution( """ parent_run_id: StrOrNone = run_id run_id: str = gen_id((job.id or "EMPTY"), unique=True) - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=job.extras ) context: DictData = {"status": WAIT} diff --git a/src/ddeutil/workflow/result.py b/src/ddeutil/workflow/result.py index d9e3ecf5..e6efb317 100644 --- a/src/ddeutil/workflow/result.py +++ b/src/ddeutil/workflow/result.py @@ -42,7 +42,7 @@ WorkflowError, ) from .__types import DictData -from .audits import TraceManager, get_trace +from .audits import Trace, get_trace from .errors import ErrorData, ResultError from .utils import default_gen_id @@ -200,9 +200,7 @@ class Result: info: DictData = field(default_factory=dict) run_id: str = field(default_factory=default_gen_id) parent_run_id: Optional[str] = field(default=None) - trace: Optional[TraceManager] = field( - default=None, compare=False, repr=False - ) + trace: Optional[Trace] = field(default=None, compare=False, repr=False) @model_validator(mode="after") def __prepare_trace(self) -> Self: @@ -211,7 +209,7 @@ def __prepare_trace(self) -> Self: :rtype: Self """ if self.trace is None: # pragma: no cov - self.trace: TraceManager = get_trace( + self.trace: Trace = get_trace( self.run_id, parent_run_id=self.parent_run_id, extras=self.extras, @@ -220,7 +218,7 @@ def __prepare_trace(self) -> Self: return self @classmethod - def from_trace(cls, trace: TraceManager): + def from_trace(cls, trace: Trace): """Construct the result model from trace for clean code objective.""" return cls( run_id=trace.run_id, diff --git a/src/ddeutil/workflow/stages.py b/src/ddeutil/workflow/stages.py index b34ab1df..34fe4233 100644 --- a/src/ddeutil/workflow/stages.py +++ b/src/ddeutil/workflow/stages.py @@ -113,7 +113,7 @@ not_in_template, param2template, ) -from .traces import TraceManager, get_trace +from .traces import Trace, get_trace from .utils import ( delay, dump_all, @@ -303,7 +303,7 @@ def execute( parent_run_id: str = run_id run_id: str = gen_id(self.iden, unique=True, extras=self.extras) context: DictData = {"status": WAIT} - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) try: @@ -638,7 +638,7 @@ async def axecute( parent_run_id: StrOrNone = run_id run_id: str = gen_id(self.iden, unique=True, extras=self.extras) context: DictData = {} - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) try: @@ -788,7 +788,7 @@ def _execute( current_retry: int = 0 exception: Exception catch(context, status=WAIT) - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) @@ -862,7 +862,7 @@ async def _axecute( current_retry: int = 0 exception: Exception catch(context, status=WAIT) - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) @@ -992,7 +992,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) message: str = ( @@ -1045,7 +1045,7 @@ async def async_process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) message: str = ( @@ -1210,7 +1210,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) bash: str = param2template( @@ -1274,7 +1274,7 @@ async def async_process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) bash: str = param2template( @@ -1415,7 +1415,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) trace.debug("[STAGE]: Prepare `globals` and `locals` variables.") @@ -1496,7 +1496,7 @@ async def async_process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) await trace.ainfo("[STAGE]: Prepare `globals` and `locals` variables.") @@ -1641,7 +1641,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) call_func: TagFunc = self.get_caller(params=params)() @@ -1760,7 +1760,7 @@ async def async_process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) call_func: TagFunc = self.get_caller(params=params)() @@ -1894,7 +1894,7 @@ def validate_model_args( "Validate argument from the caller function raise invalid type." ) from e except TypeError as e: - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=extras ) trace.warning( @@ -2022,7 +2022,7 @@ def process( """ from .workflow import Workflow - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) _trigger: str = param2template(self.trigger, params, extras=self.extras) @@ -2141,7 +2141,7 @@ def _process_branch( Returns: tuple[Status, DictData]: A pair of status and result context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) trace.debug(f"[NESTED]: Execute Branch: {branch!r}") @@ -2271,7 +2271,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) event: Event = event or Event() @@ -2410,7 +2410,7 @@ def _process_item( :rtype: tuple[Status, Result] """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) trace.debug(f"[NESTED]: Execute Item: {item!r}") @@ -2547,7 +2547,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) event: Event = event or Event() @@ -2732,7 +2732,7 @@ def _process_loop( :rtype: tuple[Status, DictData, T] :return: Return a pair of Result and changed item. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) trace.debug(f"[NESTED]: Execute Loop: {loop} (Item {item!r})") @@ -2876,7 +2876,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) event: Event = event or Event() @@ -3028,7 +3028,7 @@ def _process_case( :rtype: DictData """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) trace.debug(f"[NESTED]: Execute Case: {case!r}") @@ -3109,7 +3109,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) @@ -3212,7 +3212,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) message: str = param2template(self.message, params, extras=self.extras) @@ -3243,7 +3243,7 @@ async def async_process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) message: str = param2template(self.message, params, extras=self.extras) @@ -3324,7 +3324,7 @@ def _process_task( "by `pip install docker` first." ) from None - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) client = DockerClient( @@ -3424,7 +3424,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) trace.info(f"[STAGE]: Docker: {self.image}:{self.tag}") @@ -3538,7 +3538,7 @@ def process( Returns: Result: The execution result with status and context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) run: str = param2template(dedent(self.run), params, extras=self.extras) diff --git a/src/ddeutil/workflow/traces.py b/src/ddeutil/workflow/traces.py index 10483d1a..c51c43f8 100644 --- a/src/ddeutil/workflow/traces.py +++ b/src/ddeutil/workflow/traces.py @@ -428,6 +428,9 @@ def flush( self, metadata: list[Metadata], *, extra: Optional[DictData] = None ) -> None: ... + def pre(self) -> None: + """Pre-process of handler that will execute when start create trance.""" + class ConsoleHandler(BaseHandler): """Console Handler model.""" @@ -461,14 +464,20 @@ class FileHandler(BaseHandler): metadata_filename: ClassVar[str] = "metadata.txt" type: Literal["file"] = "file" - path: str = Field(description="A file path.") + path: str = Field( + description=( + "A file path that use to save all trace log files that include " + "stdout, stderr, and metadata." + ) + ) format: str = Field( default=( "{datetime} ({process:5d}, {thread:5d}) ({cut_id}) {message:120s} " "({filename}:{lineno})" - ) + ), + description="A trace log format that write on stdout and stderr files.", ) - buffer_size: int = 8192 + buffer_size: int = Field(default=8192) # NOTE: Private attrs for the internal process. _lock: Lock = PrivateAttr(default_factory=Lock) @@ -489,7 +498,9 @@ def pointer(self, run_id: str) -> Path: log_file.mkdir(parents=True) return log_file - def pre(self) -> None: ... + def pre(self) -> None: # pragma: no cov + if not (p := Path(self.path)).exists(): + p.mkdir(parents=True) def emit( self, @@ -497,6 +508,7 @@ def emit( *, extra: Optional[DictData] = None, ) -> None: + """Emit trace log.""" pointer: Path = self.pointer(metadata.pointer_id) std_file = "stderr" if metadata.error_flag else "stdout" with self._lock: @@ -519,7 +531,9 @@ async def amit( try: import aiofiles except ImportError as e: - raise ImportError("Async mode need aiofiles package") from e + raise ImportError( + "Async mode need to install `aiofiles` package first" + ) from e with self._lock: pointer: Path = self.pointer(metadata.pointer_id) @@ -539,6 +553,7 @@ async def amit( def flush( self, metadata: list[Metadata], *, extra: Optional[DictData] = None ) -> None: + """Flush logs.""" with self._lock: pointer: Path = self.pointer(metadata[0].pointer_id) stdout_file = open( @@ -614,7 +629,7 @@ def find_traces( """Find trace logs. Args: - path: A trace path that want to find. + path (Path | None, default None): A trace path that want to find. """ for file in sorted( (path or Path(self.path)).glob("./run_id=*"), @@ -635,6 +650,9 @@ def find_trace_with_id( run_id: A running ID of trace log. force_raise: Whether to raise an exception if not found. path: Optional path override. + + Returns: + TraceData: A TranceData instance that already passed searching data. """ base_path: Path = path or self.path file: Path = base_path / f"run_id={run_id}" @@ -758,7 +776,8 @@ def amit( metadata: Metadata, *, extra: Optional[DictData] = None, - ) -> None: ... + ) -> None: + raise NotImplementedError("Does not implement async emit yet.") def flush( self, metadata: list[Metadata], *, extra: Optional[DictData] = None @@ -1507,7 +1526,6 @@ def find_traces( try: from elasticsearch import Elasticsearch - # Create client client = Elasticsearch( hosts=es_hosts if isinstance(es_hosts, list) else [es_hosts], basic_auth=( @@ -1654,8 +1672,6 @@ def find_trace_with_id( for hit in response["hits"]["hits"]: source = hit["_source"] - - # Convert to TraceMeta trace_meta = Metadata( run_id=source["run_id"], parent_run_id=source["parent_run_id"], @@ -1867,7 +1883,7 @@ async def aexception(self, msg: str) -> None: # pragma: no cov await self.amit(msg, level="exception") -class TraceManager(BaseModel, BaseEmit, BaseAsyncEmit): +class Trace(BaseModel, BaseEmit, BaseAsyncEmit): """Trace Manager model that keep all trance handler and emit log to its handler. """ @@ -1956,7 +1972,7 @@ async def amit(self, msg: str, level: Level) -> None: any logging level. Args: - msg: A message that want to log. + msg (str): A message that want to log. level (Level): A logging mode. """ _msg: str = self.make_message(msg) @@ -2006,10 +2022,12 @@ def __exit__(self, exc_type, exc_val, exc_tb): def get_trace( run_id: str, *, + handlers: list[DictData] = None, parent_run_id: Optional[str] = None, extras: Optional[DictData] = None, -) -> TraceManager: - """Get dynamic TraceManager instance from the core config. + auto_pre_process: bool = False, +) -> Trace: + """Get dynamic Trace instance from the core config. This factory function returns the appropriate trace implementation based on configuration. It can be overridden by extras argument and accepts running ID @@ -2018,16 +2036,27 @@ def get_trace( Args: run_id (str): A running ID. parent_run_id (str | None, default None): A parent running ID. + handlers: extras: An extra parameter that want to override the core config values. + auto_pre_process (bool, default False) Returns: - TraceManager: The appropriate trace instance. + Trace: The appropriate trace instance. """ - handlers = dynamic("trace_handlers", extras=extras) - return TraceManager( - run_id=run_id, - parent_run_id=parent_run_id, - handlers=handlers, - extras=extras or {}, + handlers: list[DictData] = dynamic( + "trace_handlers", f=handlers, extras=extras + ) + trace = Trace.model_validate( + { + "run_id": run_id, + "parent_run_id": parent_run_id, + "handlers": handlers, + "extras": extras or {}, + } ) + # NOTE: Start pre-process when start create trace. + if auto_pre_process: + for handler in trace.handlers: + handler.pre() + return trace diff --git a/src/ddeutil/workflow/workflow.py b/src/ddeutil/workflow/workflow.py index 33881c5a..a232cfe3 100644 --- a/src/ddeutil/workflow/workflow.py +++ b/src/ddeutil/workflow/workflow.py @@ -61,7 +61,7 @@ validate_statuses, ) from .reusables import has_template, param2template -from .traces import TraceManager, get_trace +from .traces import Trace, get_trace from .utils import ( UTC, gen_id, @@ -276,7 +276,11 @@ def detail(self) -> DictData: # pragma: no cov return self.model_dump(by_alias=True) def md(self, author: Optional[str] = None) -> str: # pragma: no cov - """Generate the markdown template.""" + """Generate the markdown template from this Workflow model data. + + Args: + author (str | None, default None): An author name. + """ def align_newline(value: str) -> str: return value.rstrip("\n").replace("\n", "\n ") @@ -470,7 +474,7 @@ def release( parent_run_id: str = run_id context: DictData = {"status": WAIT} - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) release: datetime = self.validate_release(dt=release) @@ -568,7 +572,7 @@ def execute_job( Returns: tuple[Status, DictData]: The pair of status and result context data. """ - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) if event and event.is_set(): @@ -685,7 +689,7 @@ def execute( ts: float = time.monotonic() parent_run_id: Optional[str] = run_id run_id: str = gen_id(self.name, unique=True, extras=self.extras) - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) context: DictData = self.parameterize(params) @@ -788,7 +792,7 @@ def execute( ) elif check == SKIP: # pragma: no cov trace.info( - f"[JOB]: Skip job: {job_id!r} from trigger rule." + f"[JOB]: ⏭️ Skip job: {job_id!r} from trigger rule." ) job.set_outputs(output={"status": SKIP}, to=context) job_queue.task_done() @@ -930,7 +934,7 @@ def rerun( ts: float = time.monotonic() parent_run_id: str = run_id run_id: str = gen_id(self.name, extras=self.extras) - trace: TraceManager = get_trace( + trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) if context["status"] == SUCCESS: diff --git a/tests/test_traces.py b/tests/test_traces.py index 6f498192..1c35d34e 100644 --- a/tests/test_traces.py +++ b/tests/test_traces.py @@ -12,7 +12,7 @@ FileHandler, Message, Metadata, - TraceManager, + Trace, get_trace, ) @@ -160,6 +160,7 @@ async def test_trace_handler_base(): assert handler.emit(meta) is None assert await handler.amit(meta) is None assert handler.flush([meta]) is None + assert handler.pre() is None @pytest.mark.asyncio @@ -211,7 +212,7 @@ async def test_trace_handler_file(): def test_trace_manager_raise(): - trace = TraceManager( + trace = Trace( run_id="01", parent_run_id="1001", handlers=[{"type": "console"}], @@ -222,7 +223,7 @@ def test_trace_manager_raise(): def test_trace_manager(): - trace = TraceManager( + trace = Trace( run_id="01", parent_run_id="1001", handlers=[{"type": "console"}], @@ -251,7 +252,7 @@ def test_trace_manager(): assert len(trace._buffer) == 0 - trace = TraceManager( + trace = Trace( run_id="01", parent_run_id="1001", handlers=[], @@ -285,7 +286,7 @@ def test_trace_manager(): def test_trace_manager_files(test_path: Path): - trace = TraceManager( + trace = Trace( run_id="01", parent_run_id="1001_test_file", handlers=[ @@ -308,13 +309,15 @@ def test_trace_manager_files(test_path: Path): def test_trace_get_trace(test_path: Path): rollback = os.getenv("WORKFLOW_LOG_TRACE_HANDLERS") os.environ["WORKFLOW_LOG_TRACE_HANDLERS"] = ( - "[" - '{"type": "console"},' - f'{{"type": "file", "path": "{(test_path / "logs/trace").absolute()}"}}' - f"]" + '[{"type": "console"},' + f'{{"type": "file", "path": "{(test_path / "logs/trace").absolute()}"}}]' ) print(os.getenv("WORKFLOW_LOG_TRACE_HANDLERS")) - trace = get_trace(run_id="01", parent_run_id="1001_test_get_trace") + trace = get_trace( + run_id="01", + parent_run_id="1001_test_get_trace", + auto_pre_process=True, + ) trace.debug("This is debug message") trace.info("This is info message") trace.error("This is info message") From 0fec0e81c4b57945a9df9d376d0190aa6c9c860a Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 18:34:26 +0700 Subject: [PATCH 08/10] :art: format: add typed-hint for handler args. --- src/ddeutil/workflow/traces.py | 14 ++++++++++++-- src/ddeutil/workflow/workflow.py | 21 +++++++++++---------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/ddeutil/workflow/traces.py b/src/ddeutil/workflow/traces.py index c51c43f8..b7ce2fc5 100644 --- a/src/ddeutil/workflow/traces.py +++ b/src/ddeutil/workflow/traces.py @@ -29,7 +29,16 @@ from pathlib import Path from threading import Lock, get_ident from types import FrameType -from typing import Annotated, Any, ClassVar, Final, Literal, Optional, Union +from typing import ( + Annotated, + Any, + ClassVar, + Final, + Literal, + Optional, + TypeVar, + Union, +) from zoneinfo import ZoneInfo from pydantic import BaseModel, Field, PrivateAttr @@ -1741,6 +1750,7 @@ def find_trace_with_id( return TraceData(stdout="", stderr="") +Handler = TypeVar("Handler", bound=BaseHandler) TraceHandler = Annotated[ Union[ ConsoleHandler, @@ -2022,7 +2032,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def get_trace( run_id: str, *, - handlers: list[DictData] = None, + handlers: list[Union[DictData, Handler]] = None, parent_run_id: Optional[str] = None, extras: Optional[DictData] = None, auto_pre_process: bool = False, diff --git a/src/ddeutil/workflow/workflow.py b/src/ddeutil/workflow/workflow.py index a232cfe3..22129c1f 100644 --- a/src/ddeutil/workflow/workflow.py +++ b/src/ddeutil/workflow/workflow.py @@ -916,16 +916,17 @@ def rerun( ) -> Result: # pragma: no cov """Re-Execute workflow with passing the error context data. - :param context: A context result that get the failed status. - :param run_id: (Optional[str]) A workflow running ID. - :param event: (Event) An Event manager instance that use to cancel this - execution if it forces stopped by parent execution. - :param timeout: (float) A workflow execution time out in second unit - that use for limit time of execution and waiting job dependency. - This value does not force stop the task that still running more than - this limit time. (Default: 60 * 60 seconds) - :param max_job_parallel: (int) The maximum workers that use for job - execution in `ThreadPoolExecutor` object. (Default: 2 workers) + Args: + context: A context result that get the failed status. + run_id: (Optional[str]) A workflow running ID. + event: (Event) An Event manager instance that use to cancel this + execution if it forces stopped by parent execution. + timeout: (float) A workflow execution time out in second unit + that use for limit time of execution and waiting job dependency. + This value does not force stop the task that still running more + than this limit time. (Default: 60 * 60 seconds) + max_job_parallel: (int) The maximum workers that use for job + execution in `ThreadPoolExecutor` object. (Default: 2 workers) Returns Result: Return Result object that create from execution context with From 0c038146a8a72895fc56ec2f656cf078654e94db Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 19:16:47 +0700 Subject: [PATCH 09/10] :dart: feat: add skip on workflow release method when audit. --- src/ddeutil/workflow/__init__.py | 20 ++++++++--- src/ddeutil/workflow/audits.py | 52 ++++++++++++++++++++++------ src/ddeutil/workflow/workflow.py | 59 +++++++++++++------------------- tests/test_audits.py | 11 +++--- tests/test_workflow_release.py | 47 +++++++++++++++++++++++-- 5 files changed, 132 insertions(+), 57 deletions(-) diff --git a/src/ddeutil/workflow/__init__.py b/src/ddeutil/workflow/__init__.py index a80b12b0..5102b5ce 100644 --- a/src/ddeutil/workflow/__init__.py +++ b/src/ddeutil/workflow/__init__.py @@ -50,11 +50,25 @@ from .__cron import CronRunner from .__types import DictData, DictStr, Matrix, Re, TupleStr from .audits import ( + EVENT, + FORCE, + NORMAL, + RERUN, Audit, FileAudit, get_audit, ) -from .conf import * +from .conf import ( + PREFIX, + CallerSecret, + Config, + YamlParser, + api_config, + config, + dynamic, + env, + pass_env, +) from .errors import ( BaseError, JobCancelError, @@ -140,10 +154,6 @@ ) from .utils import * from .workflow import ( - EVENT, - FORCE, - NORMAL, - RERUN, ReleaseType, Workflow, ) diff --git a/src/ddeutil/workflow/audits.py b/src/ddeutil/workflow/audits.py index fc5e282a..b0f7f787 100644 --- a/src/ddeutil/workflow/audits.py +++ b/src/ddeutil/workflow/audits.py @@ -49,11 +49,12 @@ from abc import ABC, abstractmethod from collections.abc import Iterator from datetime import datetime, timedelta +from enum import Enum from pathlib import Path from typing import Annotated, Any, ClassVar, Literal, Optional, Union from urllib.parse import ParseResult, urlparse -from pydantic import BaseModel, Field, TypeAdapter +from pydantic import BaseModel, ConfigDict, Field, TypeAdapter from pydantic.functional_validators import field_validator, model_validator from typing_extensions import Self @@ -64,18 +65,49 @@ logger = logging.getLogger("ddeutil.workflow") +class ReleaseType(str, Enum): + """Release type enumeration for workflow execution modes. + + This enum defines the different types of workflow releases that can be + triggered, each with specific behavior and use cases. + + Attributes: + NORMAL: Standard workflow release execution + RERUN: Re-execution of previously failed workflow + EVENT: Event-triggered workflow execution + FORCE: Forced execution bypassing normal conditions + """ + + NORMAL = "normal" + RERUN = "rerun" + EVENT = "event" + FORCE = "force" + + +NORMAL = ReleaseType.NORMAL +RERUN = ReleaseType.RERUN +EVENT = ReleaseType.EVENT +FORCE = ReleaseType.FORCE + + class AuditData(BaseModel): + """Audit Data model.""" + + model_config = ConfigDict(use_enum_values=True) + name: str = Field(description="A workflow name.") release: datetime = Field(description="A release datetime.") - type: str = Field(description="A running type before logging.") + type: ReleaseType = Field( + default=NORMAL, description="A running type before logging." + ) context: DictData = Field( default_factory=dict, description="A context that receive from a workflow execution result.", ) + run_id: str = Field(description="A running ID") parent_run_id: Optional[str] = Field( default=None, description="A parent running ID." ) - run_id: str = Field(description="A running ID") runs_metadata: DictData = Field( default_factory=dict, description="A runs metadata that will use to tracking this audit log.", @@ -122,7 +154,7 @@ def __model_action(self) -> Self: @abstractmethod def is_pointed( self, - data: AuditData, + data: Any, *, extras: Optional[DictData] = None, ) -> bool: @@ -328,21 +360,21 @@ def find_audit_with_release( return AuditData.model_validate(obj=json.load(f)) def is_pointed( - self, data: AuditData, *, extras: Optional[DictData] = None + self, + data: Any, + *, + extras: Optional[DictData] = None, ) -> bool: """Check if the release log already exists at the destination log path. Args: - data: The workflow name. + data (str): extras: Optional extra parameters to override core config. Returns: bool: True if the release log exists, False otherwise. """ - # NOTE: Return False if enable writing log flag does not set. - if not dynamic("enable_write_audit", extras=extras): - return False - return self.pointer(data).exists() + return self.pointer(AuditData.model_validate(data)).exists() def pointer(self, data: AuditData) -> Path: """Return release directory path generated from model data. diff --git a/src/ddeutil/workflow/workflow.py b/src/ddeutil/workflow/workflow.py index 22129c1f..fbb45596 100644 --- a/src/ddeutil/workflow/workflow.py +++ b/src/ddeutil/workflow/workflow.py @@ -30,7 +30,6 @@ as_completed, ) from datetime import datetime -from enum import Enum from pathlib import Path from queue import Queue from textwrap import dedent @@ -42,7 +41,7 @@ from typing_extensions import Self from .__types import DictData -from .audits import Audit, get_audit +from .audits import NORMAL, Audit, ReleaseType, get_audit from .conf import YamlParser, dynamic from .errors import WorkflowCancelError, WorkflowError, WorkflowTimeoutError from .event import Event @@ -70,31 +69,6 @@ ) -class ReleaseType(str, Enum): - """Release type enumeration for workflow execution modes. - - This enum defines the different types of workflow releases that can be - triggered, each with specific behavior and use cases. - - Attributes: - NORMAL: Standard workflow release execution - RERUN: Re-execution of previously failed workflow - EVENT: Event-triggered workflow execution - FORCE: Forced execution bypassing normal conditions - """ - - NORMAL = "normal" - RERUN = "rerun" - EVENT = "event" - FORCE = "force" - - -NORMAL = ReleaseType.NORMAL -RERUN = ReleaseType.RERUN -EVENT = ReleaseType.EVENT -FORCE = ReleaseType.FORCE - - class Workflow(BaseModel): """Main workflow orchestration model for job and schedule management. @@ -464,6 +438,7 @@ def release( method. """ name: str = override_log_name or self.name + audit: Audit = audit or get_audit(extras=self.extras) # NOTE: Generate the parent running ID with not None value. if run_id: @@ -474,6 +449,14 @@ def release( parent_run_id: str = run_id context: DictData = {"status": WAIT} + audit_data: DictData = { + "name": name, + "release": release, + "type": release_type, + "run_id": run_id, + "parent_run_id": parent_run_id, + "extras": self.extras, + } trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) @@ -491,6 +474,17 @@ def release( }, extras=self.extras, ) + + if release_type == NORMAL and audit.is_pointed(data=audit_data): + trace.info("[RELEASE]: Skip this release because it already audit.") + return Result( + run_id=run_id, + parent_run_id=parent_run_id, + status=SKIP, + context=catch(context, status=SKIP), + extras=self.extras, + ) + rs: Result = self.execute( params=values, run_id=parent_run_id, @@ -500,15 +494,10 @@ def release( trace.info(f"[RELEASE]: End {name!r} : {release:%Y-%m-%d %H:%M:%S}") trace.debug(f"[RELEASE]: Writing audit: {name!r}.") ( - (audit or get_audit(extras=self.extras)).save( - data={ - "name": name, - "release": release, - "type": release_type, + audit.save( + data=audit_data + | { "context": context, - "parent_run_id": parent_run_id, - "run_id": run_id, - "extras": self.extras, "runs_metadata": ( (runs_metadata or {}) | rs.info diff --git a/tests/test_audits.py b/tests/test_audits.py index 8d8049b7..8f031a71 100644 --- a/tests/test_audits.py +++ b/tests/test_audits.py @@ -5,6 +5,7 @@ import pytest from ddeutil.workflow.audits import ( + NORMAL, AuditData, BaseAudit, FileAudit, @@ -28,7 +29,7 @@ def test_audit_data(): audit = AuditData.model_validate( { "name": "wf-scheduling", - "type": "manual", + "type": "normal", "release": datetime(2024, 1, 1, 1), "run_id": "558851633820240817184358131811", } @@ -58,8 +59,8 @@ def test_audit_file(): log = FileAudit(path="./audits") audit = AuditData.model_validate( obj={ - "name": "wf-scheduling", - "type": "manual", + "name": "wf-scheduling-not-exists", + "type": NORMAL, "release": datetime(2024, 1, 1, 1), "context": { "params": {"name": "foo"}, @@ -80,7 +81,7 @@ def test_audit_file_do_first(): audit = AuditData.model_validate( { "name": "wf-demo-logging", - "type": "manual", + "type": "normal", "release": datetime(2024, 1, 1, 1), "context": { "params": {"name": "logging"}, @@ -109,7 +110,7 @@ def test_audit_file_find(root_path): audit = AuditData.model_validate( { "name": "wf-scheduling", - "type": "manual", + "type": "normal", "release": datetime(2024, 1, 1, 1), "context": { "params": {"name": "foo"}, diff --git a/tests/test_workflow_release.py b/tests/test_workflow_release.py index 83838a47..c975fd21 100644 --- a/tests/test_workflow_release.py +++ b/tests/test_workflow_release.py @@ -1,9 +1,12 @@ +import shutil from datetime import datetime from zoneinfo import ZoneInfo import pytest from ddeutil.workflow import ( + FORCE, NORMAL, + SKIP, SUCCESS, UTC, Result, @@ -108,7 +111,7 @@ def test_workflow_release(): } -def test_workflow_release_with_datetime(): +def test_workflow_release_with_datetime_force(): workflow: Workflow = Workflow.model_validate( obj={ "name": "wf-scheduling-common", @@ -126,6 +129,7 @@ def test_workflow_release_with_datetime(): dt: datetime = datetime(2025, 1, 18, tzinfo=ZoneInfo("Asia/Bangkok")) rs: Result = workflow.release( release=dt, + release_type=FORCE, params={"asat-dt": datetime(2024, 10, 1)}, ) assert dt == datetime(2025, 1, 18, tzinfo=ZoneInfo("Asia/Bangkok")) @@ -134,7 +138,7 @@ def test_workflow_release_with_datetime(): "status": SUCCESS, "params": {"asat-dt": datetime(2024, 10, 1, 0, 0)}, "release": { - "type": NORMAL, + "type": FORCE, # NOTE: The date that pass to release method will convert to UTC. "logical_date": datetime(2025, 1, 17, 17, tzinfo=UTC), }, @@ -156,6 +160,45 @@ def test_workflow_release_with_datetime(): } +def test_workflow_release_with_datetime(test_path): + test_audit_skip_path = test_path / "tests_workflow_release_audits" + + workflow: Workflow = Workflow.model_validate( + obj={ + "name": "wf-scheduling-common", + "jobs": { + "first-job": { + "stages": [ + {"name": "First Stage", "id": "first-stage"}, + {"name": "Second Stage", "id": "second-stage"}, + ] + } + }, + "extras": { + "audit_conf": { + "type": "file", + "path": str(test_audit_skip_path.absolute()), + } + }, + } + ) + dt: datetime = datetime(2025, 1, 18, tzinfo=ZoneInfo("Asia/Bangkok")) + rs: Result = workflow.release( + release=dt, + params={"asat-dt": datetime(2024, 10, 1)}, + ) + assert rs.status == SUCCESS + + rs: Result = workflow.release( + release=dt, + params={"asat-dt": datetime(2024, 10, 1)}, + ) + assert rs.status == SKIP + assert rs.context == {"status": SKIP} + + shutil.rmtree(test_audit_skip_path) + + def test_workflow_release_with_auto(): workflow: Workflow = Workflow.model_validate( obj={ From 30331f99aa925fb4275745a9550f706a7bbdbcdc Mon Sep 17 00:00:00 2001 From: Korawich Anuttra Date: Sun, 20 Jul 2025 20:54:21 +0700 Subject: [PATCH 10/10] :dart: feat: add raise error when pass rerun mode to release method. --- src/ddeutil/workflow/workflow.py | 10 +++++++++- tests/test_workflow_release.py | 25 +++++++++++++++++++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/ddeutil/workflow/workflow.py b/src/ddeutil/workflow/workflow.py index fbb45596..5739c60d 100644 --- a/src/ddeutil/workflow/workflow.py +++ b/src/ddeutil/workflow/workflow.py @@ -41,7 +41,7 @@ from typing_extensions import Self from .__types import DictData -from .audits import NORMAL, Audit, ReleaseType, get_audit +from .audits import NORMAL, RERUN, Audit, ReleaseType, get_audit from .conf import YamlParser, dynamic from .errors import WorkflowCancelError, WorkflowError, WorkflowTimeoutError from .event import Event @@ -485,6 +485,14 @@ def release( extras=self.extras, ) + if release_type == RERUN: + # TODO: It will load previous audit and use this data to run with + # the `rerun` method. + raise NotImplementedError( + "Release does not support for rerun type yet. Please use the " + "`rerun` method instead." + ) + rs: Result = self.execute( params=values, run_id=parent_run_id, diff --git a/tests/test_workflow_release.py b/tests/test_workflow_release.py index c975fd21..de0391c1 100644 --- a/tests/test_workflow_release.py +++ b/tests/test_workflow_release.py @@ -6,6 +6,7 @@ from ddeutil.workflow import ( FORCE, NORMAL, + RERUN, SKIP, SUCCESS, UTC, @@ -214,10 +215,26 @@ def test_workflow_release_with_auto(): "extra": {"enable_write_audit": True}, } ) - rs: Result = workflow.release( - release=datetime.now(), - params={"asat-dt": datetime(2024, 10, 1)}, - ) + rs: Result = workflow.release(release=datetime.now(), params={}) assert rs.status == SUCCESS assert rs.context["release"]["type"] == NORMAL assert rs.context["release"]["logical_date"].tzinfo == UTC + + +def test_workflow_release_rerun(): + workflow: Workflow = Workflow.model_validate( + obj={ + "name": "wf-scheduling-common", + "jobs": { + "first-job": { + "stages": [ + {"name": "First Stage", "id": "first-stage"}, + {"name": "Second Stage", "id": "second-stage"}, + ] + } + }, + "extra": {"enable_write_audit": True}, + } + ) + with pytest.raises(NotImplementedError): + workflow.release(release=datetime.now(), params={}, release_type=RERUN)