diff --git a/docs/examples/conf/01-data-pipeline-etl.yml b/docs/examples/conf/01-etl-s3.yml similarity index 96% rename from docs/examples/conf/01-data-pipeline-etl.yml rename to docs/examples/conf/01-etl-s3.yml index 958bac7b..54f549f0 100644 --- a/docs/examples/conf/01-data-pipeline-etl.yml +++ b/docs/examples/conf/01-etl-s3.yml @@ -1,4 +1,4 @@ -name: "data-pipeline-etl" +name: "etl-s3" type: "Workflow" desc: | End-to-end ETL pipeline for customer data processing with data quality checks @@ -31,14 +31,12 @@ jobs: data-extraction: id: "data-extraction" desc: "Extract raw data from multiple sources" - runs-on: type: "aws_batch" with: job_queue_arn: "${AWS_BATCH_JOB_QUEUE_ARN}" s3_bucket: "${S3_BUCKET}" region_name: "us-east-1" - stages: - name: "Extract customer data" bash: | @@ -56,11 +54,14 @@ jobs: transaction_files = glob.glob("/tmp/transactions/*.csv") if not customer_files: - raise Exception("No customer data files found") + raise FileNotFoundError("No customer data files found") if not transaction_files: - raise Exception("No transaction data files found") + raise FileNotFoundError("No transaction data files found") - print(f"Found {len(customer_files)} customer files and {len(transaction_files)} transaction files") + print( + f"Found {len(customer_files)} customer files and " + f"{len(transaction_files)} transaction files" + ) - name: "Generate extraction report" echo: "Data extraction completed successfully for ${{ params.processing_date }}" diff --git a/docs/examples/conf/02-ml-model-training.yml b/docs/examples/conf/02-ml-model-training.yml index bbe78c3d..79d1e032 100644 --- a/docs/examples/conf/02-ml-model-training.yml +++ b/docs/examples/conf/02-ml-model-training.yml @@ -1,7 +1,12 @@ name: "ml-model-training" type: "Workflow" -description: "Machine learning model training pipeline with hyperparameter optimization and model evaluation" - +description: | + Machine learning model training pipeline with hyperparameter optimization + and model evaluation +on: + schedule: + - cronjob: "0 0 * * 0" # Weekly on Sunday at midnight + timezone: "UTC" params: dataset_path: type: str @@ -10,7 +15,10 @@ params: model_type: type: choice - options: ["random_forest", "xgboost", "neural_network"] + options: + - "random_forest" + - "xgboost" + - "neural_network" desc: "Type of model to train" experiment_name: @@ -370,8 +378,3 @@ jobs: channel: "#ml-deployments" message: "Model ${{ params.experiment_name }} v1 deployed successfully" color: "good" - -on: - schedule: - - cronjob: "0 0 * * 0" # Weekly on Sunday at midnight - timezone: "UTC" diff --git a/src/ddeutil/workflow/__init__.py b/src/ddeutil/workflow/__init__.py index b07e8864..9b023fa6 100644 --- a/src/ddeutil/workflow/__init__.py +++ b/src/ddeutil/workflow/__init__.py @@ -50,12 +50,12 @@ from .__cron import CronRunner from .__types import DictData, DictStr, Matrix, Re, TupleStr from .audits import ( - EVENT, + DRYRUN, FORCE, NORMAL, RERUN, Audit, - FileAudit, + LocalFileAudit, get_audit, ) from .conf import ( diff --git a/src/ddeutil/workflow/__types.py b/src/ddeutil/workflow/__types.py index 226f0c0d..8f1edb54 100644 --- a/src/ddeutil/workflow/__types.py +++ b/src/ddeutil/workflow/__types.py @@ -16,17 +16,26 @@ Match, Pattern, ) -from typing import Any, Optional, TypedDict, Union +from typing import Any, Optional, TypedDict, Union, cast from typing_extensions import Self StrOrNone = Optional[str] StrOrInt = Union[str, int] TupleStr = tuple[str, ...] +ListStr = list[str] +ListInt = list[int] DictData = dict[str, Any] +DictRange = dict[int, Any] DictStr = dict[str, str] Matrix = dict[str, Union[list[str], list[int]]] + +def cast_dict(value: TypedDict[...]) -> DictData: + """Cast any TypedDict object to DictData type.""" + return cast(DictData, value) + + # Pre-compile regex patterns for better performance _RE_CALLER_PATTERN = r""" \$ # start with $ diff --git a/src/ddeutil/workflow/audits.py b/src/ddeutil/workflow/audits.py index b0f7f787..3b2da0bf 100644 --- a/src/ddeutil/workflow/audits.py +++ b/src/ddeutil/workflow/audits.py @@ -74,31 +74,38 @@ class ReleaseType(str, Enum): Attributes: NORMAL: Standard workflow release execution RERUN: Re-execution of previously failed workflow - EVENT: Event-triggered workflow execution + DRYRUN: Dry-execution workflow FORCE: Forced execution bypassing normal conditions """ NORMAL = "normal" RERUN = "rerun" - EVENT = "event" FORCE = "force" + DRYRUN = "dryrun" NORMAL = ReleaseType.NORMAL RERUN = ReleaseType.RERUN -EVENT = ReleaseType.EVENT +DRYRUN = ReleaseType.DRYRUN FORCE = ReleaseType.FORCE class AuditData(BaseModel): - """Audit Data model.""" + """Audit Data model that use to be the core data for any Audit model manage + logging at the target pointer system or service like file-system, sqlite + database, etc. + """ model_config = ConfigDict(use_enum_values=True) name: str = Field(description="A workflow name.") release: datetime = Field(description="A release datetime.") type: ReleaseType = Field( - default=NORMAL, description="A running type before logging." + default=NORMAL, + description=( + "An execution type that should be value in ('normal', 'rerun', " + "'force', 'dryrun')." + ), ) context: DictData = Field( default_factory=dict, @@ -121,18 +128,17 @@ class BaseAudit(BaseModel, ABC): for logging subclasses like file, sqlite, etc. """ - type: str + type: Literal["base"] = "base" + logging_name: str = "ddeutil.workflow" extras: DictData = Field( default_factory=dict, description="An extras parameter that want to override core config", ) @field_validator("extras", mode="before") - def validate_extras(cls, v: Any) -> DictData: + def __prepare_extras(cls, v: Any) -> Any: """Validate extras field to ensure it's a dictionary.""" - if v is None: - return {} - return v + return {} if v is None else v @model_validator(mode="after") def __model_action(self) -> Self: @@ -148,7 +154,7 @@ def __model_action(self) -> Self: self.do_before() # NOTE: Start setting log config in this line with cache. - set_logging("ddeutil.workflow") + set_logging(self.logging_name) return self @abstractmethod @@ -248,7 +254,7 @@ def save( raise NotImplementedError("Audit should implement `save` method.") -class FileAudit(BaseAudit): +class LocalFileAudit(BaseAudit): """File Audit Pydantic Model for saving log data from workflow execution. This class inherits from BaseAudit and implements file-based storage @@ -256,19 +262,25 @@ class FileAudit(BaseAudit): in a structured directory hierarchy. Attributes: - filename_fmt: Class variable defining the filename format for audit files. + file_fmt: Class variable defining the filename format for audit log. + file_release_fmt: Class variable defining the filename format for audit + release log. """ - filename_fmt: ClassVar[str] = ( - "workflow={name}/release={release:%Y%m%d%H%M%S}" - ) + file_fmt: ClassVar[str] = "workflow={name}" + file_release_fmt: ClassVar[str] = "release={release:%Y%m%d%H%M%S}" type: Literal["file"] = "file" - path: str = Field( - default="./audits", + path: Path = Field( + default=Path("./audits"), description="A file path that use to manage audit logs.", ) + @field_validator("path", mode="before", json_schema_input_type=str) + def __prepare_path(cls, data: Any) -> Any: + """Prepare path that passing with string to Path instance.""" + return Path(data) if isinstance(data, str) else data + def do_before(self) -> None: """Create directory of release before saving log file. @@ -278,7 +290,10 @@ def do_before(self) -> None: Path(self.path).mkdir(parents=True, exist_ok=True) def find_audits( - self, name: str, *, extras: Optional[DictData] = None + self, + name: str, + *, + extras: Optional[DictData] = None, ) -> Iterator[AuditData]: """Generate audit data found from logs path for a specific workflow name. @@ -292,7 +307,7 @@ def find_audits( Raises: FileNotFoundError: If the workflow directory does not exist. """ - pointer: Path = Path(self.path) / f"workflow={name}" + pointer: Path = self.path / self.file_fmt.format(name=name) if not pointer.exists(): raise FileNotFoundError(f"Pointer: {pointer.absolute()}.") @@ -325,7 +340,7 @@ def find_audit_with_release( ValueError: If no releases found when release is None. """ if release is None: - pointer: Path = Path(self.path) / f"workflow={name}" + pointer: Path = self.path / self.file_fmt.format(name=name) if not pointer.exists(): raise FileNotFoundError(f"Pointer: {pointer.absolute()}.") @@ -382,8 +397,10 @@ def pointer(self, data: AuditData) -> Path: Returns: Path: The directory path for the current workflow and release. """ - return Path(self.path) / self.filename_fmt.format( - name=data.name, release=data.release + return ( + self.path + / self.file_fmt.format(**data.model_dump(by_alias=True)) + / self.file_release_fmt.format(**data.model_dump(by_alias=True)) ) def save(self, data: Any, excluded: Optional[list[str]] = None) -> Self: @@ -459,7 +476,7 @@ def cleanup(self, max_age_days: int = 180) -> int: # pragma: no cov return cleaned_count -class SQLiteAudit(BaseAudit): # pragma: no cov +class LocalSQLiteAudit(BaseAudit): # pragma: no cov """SQLite Audit model for database-based audit storage. This class inherits from BaseAudit and implements SQLite database storage @@ -467,11 +484,11 @@ class SQLiteAudit(BaseAudit): # pragma: no cov Attributes: table_name: Class variable defining the database table name. - schemas: Class variable defining the database schema. + ddl: Class variable defining the database schema. """ table_name: ClassVar[str] = "audits" - schemas: ClassVar[ + ddl: ClassVar[ str ] = """ CREATE TABLE IF NOT EXISTS audits ( @@ -489,22 +506,21 @@ class SQLiteAudit(BaseAudit): # pragma: no cov """ type: Literal["sqlite"] = "sqlite" - path: str + path: Path = Field( + default=Path("./audits.db"), + description="A SQLite filepath.", + ) - def _ensure_table_exists(self) -> None: + def do_before(self) -> None: """Ensure the audit table exists in the database.""" - audit_url = dynamic("audit_url", extras=self.extras) - if audit_url is None or not audit_url.path: + if self.path.is_dir(): raise ValueError( - "SQLite audit_url must specify a database file path" + "SQLite path must specify a database file path not dir." ) - audit_url_parse: ParseResult = urlparse(audit_url) - db_path = Path(audit_url_parse.path) - db_path.parent.mkdir(parents=True, exist_ok=True) - - with sqlite3.connect(db_path) as conn: - conn.execute(self.schemas) + self.path.parent.mkdir(parents=True, exist_ok=True) + with sqlite3.connect(self.path) as conn: + conn.execute(self.ddl) conn.commit() def is_pointed( @@ -771,24 +787,31 @@ def cleanup(self, max_age_days: int = 180) -> int: return cursor.rowcount +class PostgresAudit(BaseAudit, ABC): ... # pragma: no cov + + Audit = Annotated[ Union[ - FileAudit, - SQLiteAudit, + LocalFileAudit, + LocalSQLiteAudit, ], Field(discriminator="type"), ] -def get_audit(extras: Optional[DictData] = None) -> Audit: # pragma: no cov +def get_audit( + audit_conf: Optional[DictData] = None, + extras: Optional[DictData] = None, +) -> Audit: # pragma: no cov """Get an audit model dynamically based on the config audit path value. Args: + audit_conf (DictData): extras: Optional extra parameters to override the core config. Returns: Audit: The appropriate audit model class based on configuration. """ - audit_conf = dynamic("audit_conf", extras=extras) + audit_conf = dynamic("audit_conf", f=audit_conf, extras=extras) model = TypeAdapter(Audit).validate_python(audit_conf | {"extras": extras}) return model diff --git a/src/ddeutil/workflow/event.py b/src/ddeutil/workflow/event.py index 7fac7f7e..9341726c 100644 --- a/src/ddeutil/workflow/event.py +++ b/src/ddeutil/workflow/event.py @@ -16,13 +16,9 @@ Interval: Type alias for scheduling intervals ('daily', 'weekly', 'monthly') Classes: + CrontabValue: Crontab: Main cron-based event scheduler. CrontabYear: Enhanced cron scheduler with year constraints. - ReleaseEvent: Release-based event triggers. - FileEvent: File system monitoring triggers. - WebhookEvent: API/webhook-based triggers. - DatabaseEvent: Database change monitoring triggers. - SensorEvent: Sensor-based event monitoring. Example: >>> from ddeutil.workflow.event import Crontab @@ -395,19 +391,16 @@ class Event(BaseModel): ) @field_validator("schedule", mode="after") - def __on_no_dup_and_reach_limit__( - cls, - value: list[Crontab], - ) -> list[Crontab]: + def __prepare_schedule__(cls, value: list[Crontab]) -> list[Crontab]: """Validate the on fields should not contain duplicate values and if it contains the every minute value more than one value, it will remove to only one value. Args: - value: A list of on object. + value (list[Crontab]): A list of on object. Returns: - list[CronJobYear | Crontab]: The validated list of Crontab objects. + list[Crontab]: The validated list of Crontab objects. Raises: ValueError: If it has some duplicate value. diff --git a/src/ddeutil/workflow/job.py b/src/ddeutil/workflow/job.py index 757b9885..738b214b 100644 --- a/src/ddeutil/workflow/job.py +++ b/src/ddeutil/workflow/job.py @@ -73,7 +73,7 @@ from .reusables import has_template, param2template from .stages import Stage from .traces import Trace, get_trace -from .utils import cross_product, filter_func, gen_id +from .utils import cross_product, extract_id, filter_func, gen_id MatrixFilter = list[dict[str, Union[str, int]]] @@ -890,8 +890,9 @@ def execute( Result: Return Result object that create from execution context. """ ts: float = time.monotonic() - parent_run_id: str = run_id - run_id: str = gen_id((self.id or "EMPTY"), unique=True) + parent_run_id, run_id = extract_id( + (self.id or "EMPTY"), run_id=run_id, extras=self.extras + ) trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) @@ -1272,7 +1273,6 @@ def local_execute( errors: DictData = {} statuses: list[Status] = [WAIT] * len_strategy - fail_fast: bool = False if not job.strategy.fail_fast: done: Iterator[Future] = as_completed(futures) @@ -1297,7 +1297,6 @@ def local_execute( ) trace.debug(f"[JOB]: ... Job was set Fail-Fast{nd}") done: Iterator[Future] = as_completed(futures) - fail_fast: bool = True for i, future in enumerate(done, start=0): try: @@ -1312,19 +1311,10 @@ def local_execute( pass status: Status = validate_statuses(statuses) - - # NOTE: Prepare status because it does not cancel from parent event but - # cancel from failed item execution. - if fail_fast and status == CANCEL: - status = FAILED - - return Result( - run_id=run_id, - parent_run_id=parent_run_id, + return Result.from_trace(trace).catch( status=status, context=catch(context, status=status, updated=errors), info={"execution_time": time.monotonic() - ts}, - extras=job.extras, ) diff --git a/src/ddeutil/workflow/result.py b/src/ddeutil/workflow/result.py index e6efb317..5fd41818 100644 --- a/src/ddeutil/workflow/result.py +++ b/src/ddeutil/workflow/result.py @@ -8,20 +8,12 @@ This module provides the core result and status management functionality for workflow execution tracking. It includes the Status enumeration for execution states and the Result dataclass for context transfer between workflow components. - -Classes: - Status: Enumeration for execution status tracking - Result: Dataclass for execution context and result management - -Functions: - validate_statuses: Determine final status from multiple status values - get_status_from_error: Convert exception types to appropriate status """ from __future__ import annotations from dataclasses import field from enum import Enum -from typing import Optional, TypedDict, Union +from typing import Any, Optional, TypedDict, Union from pydantic import ConfigDict from pydantic.dataclasses import dataclass @@ -126,10 +118,10 @@ def validate_statuses(statuses: list[Status]) -> Status: >>> validate_statuses([SUCCESS, SUCCESS, SUCCESS]) >>> # Returns: SUCCESS """ - if any(s == CANCEL for s in statuses): - return CANCEL - elif any(s == FAILED for s in statuses): + if any(s == FAILED for s in statuses): return FAILED + elif any(s == CANCEL for s in statuses): + return CANCEL elif any(s == WAIT for s in statuses): return WAIT for status in (SUCCESS, SKIP): @@ -313,3 +305,40 @@ class Context(TypedDict): context: NotRequired[DictData] errors: NotRequired[Union[list[ErrorData], ErrorData]] info: NotRequired[DictData] + + +class Layer(str, Enum): + WORKFLOW = "workflow" + JOB = "job" + STRATEGY = "strategy" + STAGE = "stage" + + +def get_context_by_layer( + context: DictData, + key: str, + layer: Layer, + context_key: str, + *, + default: Optional[Any] = None, +) -> Any: # pragma: no cov + if layer == Layer.WORKFLOW: + return context.get("jobs", {}).get(key, {}).get(context_key, default) + elif layer == Layer.JOB: + return context.get("stages", {}).get(key, {}).get(context_key, default) + elif layer == Layer.STRATEGY: + return ( + context.get("strategies", {}).get(key, {}).get(context_key, default) + ) + return context.get(key, {}).get(context_key, default) + + +def get_status( + context: DictData, + key: str, + layer: Layer, +) -> Status: # pragma: no cov + """Get status from context by a specific key and context layer.""" + return get_context_by_layer( + context, key, layer, context_key="status", default=WAIT + ) diff --git a/src/ddeutil/workflow/stages.py b/src/ddeutil/workflow/stages.py index 34fe4233..f0d80d26 100644 --- a/src/ddeutil/workflow/stages.py +++ b/src/ddeutil/workflow/stages.py @@ -71,7 +71,9 @@ Annotated, Any, Callable, + ClassVar, Optional, + TypedDict, TypeVar, Union, get_type_hints, @@ -80,10 +82,10 @@ from ddeutil.core import str2list from pydantic import BaseModel, Field, ValidationError from pydantic.functional_validators import field_validator, model_validator -from typing_extensions import Self +from typing_extensions import NotRequired, Self from .__about__ import __python_version__ -from .__types import DictData, DictStr, StrOrInt, StrOrNone, TupleStr +from .__types import DictData, DictStr, StrOrInt, StrOrNone, TupleStr, cast_dict from .conf import dynamic, pass_env from .errors import ( StageCancelError, @@ -117,6 +119,7 @@ from .utils import ( delay, dump_all, + extract_id, filter_func, gen_id, make_exec, @@ -162,6 +165,7 @@ class BaseStage(BaseModel, ABC): ``` """ + action_stage: ClassVar[bool] = False extras: DictData = Field( default_factory=dict, description="An extra parameter that override core config values.", @@ -230,6 +234,19 @@ def __prepare_running_id(self) -> Self: ) return self + def pass_template(self, value: Any, params: DictData) -> Any: + """Pass template and environment variable to any value that can + templating. + + Args: + value (Any): An any value. + params (DictData): + + Returns: + Any: A templated value. + """ + return pass_env(param2template(value, params, extras=self.extras)) + @abstractmethod def process( self, @@ -244,10 +261,10 @@ def process( This is important method that make this class is able to be the stage. Args: - params: A parameter data that want to use in this + params (DictData): A parameter data that want to use in this execution. - run_id: A running stage ID. - context: A context data. + run_id (str): A running stage ID. + context (DictData): A context data. parent_run_id: A parent running ID. (Default is None) event: An event manager that use to track parent process was not force stopped. @@ -300,8 +317,9 @@ def execute( Result: The execution result with updated status and context. """ ts: float = time.monotonic() - parent_run_id: str = run_id - run_id: str = gen_id(self.iden, unique=True, extras=self.extras) + parent_run_id, run_id = extract_id( + self.iden, run_id=run_id, extras=self.extras + ) context: DictData = {"status": WAIT} trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras @@ -353,15 +371,12 @@ def execute( StageError, ) as e: # pragma: no cov if isinstance(e, StageNestedError): - trace.info(f"[STAGE]: Handler: {e}") + trace.info(f"[STAGE]: Nested: {e}") + elif isinstance(e, (StageSkipError, StageNestedSkipError)): + trace.info(f"[STAGE]: ⏭️ Skip: {e}") else: - emoji: str = ( - "⏭️" - if isinstance(e, (StageSkipError, StageNestedSkipError)) - else "🚨" - ) trace.info( - f"[STAGE]: Handler:||{emoji} {traceback.format_exc()}" + f"[STAGE]: Stage Failed:||🚨 {traceback.format_exc()}||" ) st: Status = get_status_from_error(e) return Result( @@ -381,7 +396,9 @@ def execute( extras=self.extras, ) except Exception as e: - trace.error(f"[STAGE]: Error Handler:||🚨 {traceback.format_exc()}") + trace.error( + f"[STAGE]: Error Failed:||🚨 {traceback.format_exc()}||" + ) return Result( run_id=run_id, parent_run_id=parent_run_id, @@ -492,10 +509,12 @@ def get_outputs(self, output: DictData) -> DictData: """Get the outputs from stages data. It will get this stage ID from the stage outputs mapping. - :param output: (DictData) A stage output context that want to get this - stage ID `outputs` key. + Args: + output (DictData): A stage output context that want to get this + stage ID `outputs` key. - :rtype: DictData + Returns: + DictData: An output value that have get with its identity. """ if self.id is None and not dynamic( "stage_default_id", extras=self.extras @@ -568,13 +587,53 @@ def is_nested(self) -> bool: """ return False - def docs(self) -> str: # pragma: no cov + def detail(self) -> DictData: # pragma: no cov + """Return the detail of this stage for generate markdown. + + Returns: + DictData: A dict that was dumped from this model with alias mode. + """ + return self.model_dump(by_alias=True) + + def md(self) -> str: # pragma: no cov """Return generated document that will be the interface of this stage. :rtype: str """ return self.desc + def dryrun( + self, + params: DictData, + run_id: str, + context: DictData, + *, + parent_run_id: Optional[str] = None, + event: Optional[Event] = None, + ) -> Optional[Result]: # pragma: no cov + """Pre-process method that will use to run with dry-run mode, and it + should be used before process method. + """ + + def to_empty(self, sleep: int = 0.35) -> EmptyStage: # pragma: no cov + """Convert the current Stage model to the EmptyStage model for dry-run + mode if the `action_stage` class attribute has set. + + Returns: + EmptyStage: An EmptyStage model that passing itself model data to + message. + """ + return EmptyStage.model_validate( + { + "name": self.name, + "id": self.id, + "desc": self.desc, + "if": self.condition, + "echo": f"Convert from {self.__class__.__name__}", + "sleep": sleep, + } + ) + class BaseAsyncStage(BaseStage, ABC): """Base Async Stage model to make any stage model allow async execution for @@ -686,15 +745,12 @@ async def axecute( StageError, ) as e: # pragma: no cov if isinstance(e, StageNestedError): - await trace.ainfo(f"[STAGE]: Handler: {e}") + await trace.ainfo(f"[STAGE]: Nested: {e}") + elif isinstance(e, (StageSkipError, StageNestedSkipError)): + await trace.ainfo(f"[STAGE]: ⏭️ Skip: {e}") else: - emoji: str = ( - "⏭️" - if isinstance(e, (StageSkipError, StageNestedSkipError)) - else "🚨" - ) await trace.ainfo( - f"[STAGE]:Handler:||{emoji} {traceback.format_exc()}" + f"[STAGE]: Stage Failed:||🚨 {traceback.format_exc()}||" ) st: Status = get_status_from_error(e) return Result( @@ -706,8 +762,8 @@ async def axecute( status=st, updated=( None - if isinstance(e, StageSkipError) - else {"status": st, "errors": e.to_dict()} + if isinstance(e, (StageSkipError, StageNestedSkipError)) + else {"errors": e.to_dict()} ), ), info={"execution_time": time.monotonic() - ts}, @@ -715,7 +771,7 @@ async def axecute( ) except Exception as e: await trace.aerror( - f"[STAGE]: Error Handler:||🚨 {traceback.format_exc()}" + f"[STAGE]: Error Failed:||🚨 {traceback.format_exc()}||" ) return Result( run_id=run_id, @@ -827,6 +883,14 @@ def _execute( parent_run_id=parent_run_id, event=event, ) + except ( + StageSkipError, + StageNestedSkipError, + StageCancelError, + StageNestedCancelError, + ): + trace.debug("[STAGE]: process raise skip or cancel error.") + raise except Exception as e: current_retry += 1 trace.warning( @@ -901,6 +965,16 @@ async def _axecute( parent_run_id=parent_run_id, event=event, ) + except ( + StageSkipError, + StageNestedSkipError, + StageCancelError, + StageNestedCancelError, + ): + await trace.adebug( + "[STAGE]: process raise skip or cancel error." + ) + raise except Exception as e: current_retry += 1 await trace.awarning( @@ -1004,9 +1078,7 @@ def process( ) if event and event.is_set(): - raise StageCancelError( - "Execution was canceled from the event before start parallel." - ) + raise StageCancelError("Cancel before start empty process.") trace.info(f"[STAGE]: Message: ( {message} )") if self.sleep > 0: @@ -1057,9 +1129,7 @@ async def async_process( ) if event and event.is_set(): - raise StageCancelError( - "Execution was canceled from the event before start parallel." - ) + raise StageCancelError("Cancel before start empty process.") trace.info(f"[STAGE]: Message: ( {message} )") if self.sleep > 0: @@ -1697,9 +1767,7 @@ def process( args.pop("extras") if event and event.is_set(): - raise StageCancelError( - "Execution was canceled from the event before start parallel." - ) + raise StageCancelError("Cancel before start call process.") args: DictData = self.validate_model_args( call_func, args, run_id, parent_run_id, extras=self.extras @@ -1817,9 +1885,7 @@ async def async_process( args.pop("extras") if event and event.is_set(): - raise StageCancelError( - "Execution was canceled from the event before start parallel." - ) + raise StageCancelError("Cancel before start call process.") args: DictData = self.validate_model_args( call_func, args, run_id, parent_run_id, extras=self.extras @@ -1932,8 +1998,9 @@ def mark_errors(context: DictData, error: StageError) -> None: """Make the errors context result with the refs value depends on the nested execute func. - :param context: (DictData) A context data. - :param error: (StageError) A stage exception object. + Args: + context: (DictData) A context data. + error: (StageError) A stage exception object. """ if "errors" in context: context["errors"][error.refs] = error.to_dict() @@ -2026,18 +2093,9 @@ def process( run_id, parent_run_id=parent_run_id, extras=self.extras ) _trigger: str = param2template(self.trigger, params, extras=self.extras) - # if _trigger in self.extras.get("stop_circle_workflow_name", []): - # raise StageError( - # "[NESTED]: Circle execution via trigger itself workflow name." - # ) + if _trigger == self.extras.get("__sys_break_circle_exec", "NOTSET"): + raise StageError("Circle execute via trigger itself workflow name.") trace.info(f"[NESTED]: Load Workflow Config: {_trigger!r}") - - # # NOTE: add noted key for cancel circle execution. - # if "stop_circle_workflow_name" in self.extras: - # self.extras["stop_circle_workflow_name"].append(_trigger) - # else: - # self.extras.update({"stop_circle_workflow_name": [_trigger]}) - result: Result = Workflow.from_conf( name=pass_env(_trigger), extras=self.extras, @@ -2053,14 +2111,43 @@ def process( if (msg := result.context.get("errors", {}).get("message")) else "." ) - raise StageNestedError(f"Trigger workflow was failed{err_msg}") + return result.catch( + status=FAILED, + context={ + "status": FAILED, + "errors": StageError( + f"Trigger workflow was failed{err_msg}" + ).to_dict(), + }, + ) elif result.status == CANCEL: - raise StageNestedCancelError("Trigger workflow was cancel.") + return result.catch( + status=CANCEL, + context={ + "status": CANCEL, + "errors": StageCancelError( + "Trigger workflow was cancel." + ).to_dict(), + }, + ) elif result.status == SKIP: - raise StageNestedSkipError("Trigger workflow was skipped.") + return result.catch( + status=SKIP, + context={ + "status": SKIP, + "errors": StageSkipError( + "Trigger workflow was skipped." + ).to_dict(), + }, + ) return result +class ParallelContext(TypedDict): + branch: str + stages: NotRequired[dict[str, Any]] + + class ParallelStage(BaseNestedStage): """Parallel stage executor that execute branch stages with multithreading. This stage let you set the fix branches for running child stage inside it on @@ -2098,10 +2185,8 @@ class ParallelStage(BaseNestedStage): parallel: dict[str, list[Stage]] = Field( description="A mapping of branch name and its stages.", ) - max_workers: int = Field( + max_workers: Union[int, str] = Field( default=2, - ge=1, - lt=20, description=( "The maximum multi-thread pool worker size for execution parallel. " "This value should be gather or equal than 1, and less than 20." @@ -2109,14 +2194,20 @@ class ParallelStage(BaseNestedStage): alias="max-workers", ) - def _process_branch( + @field_validator("max_workers") + def __validate_max_workers(cls, value: Union[int, str]) -> Union[int, str]: + """Validate `max_workers` field that should has value between 1 and 19.""" + if isinstance(value, int) and (value < 1 or value >= 20): + raise ValueError("A max-workers value should between 1 and 19.") + return value + + def _process_nested( self, branch: str, params: DictData, - run_id: str, + trace: Trace, context: DictData, *, - parent_run_id: Optional[str] = None, event: Optional[Event] = None, ) -> tuple[Status, DictData]: """Execute branch that will execute all nested-stage that was set in @@ -2125,15 +2216,14 @@ def _process_branch( Args: branch (str): A branch ID. params (DictData): A parameter data. - run_id (str): A running ID. + trace (Trace): A Trace model. context (DictData): - parent_run_id (str | None, default None): A parent running ID. event: (Event) An Event manager instance that use to cancel this execution if it forces stopped by parent execution. (Default is None) Raises: - StageCancelError: If event was set. + StageCancelError: If event was set before start stage execution. StageCancelError: If result from a nested-stage return canceled status. StageError: If result from a nested-stage return failed status. @@ -2141,15 +2231,10 @@ def _process_branch( Returns: tuple[Status, DictData]: A pair of status and result context data. """ - trace: Trace = get_trace( - run_id, parent_run_id=parent_run_id, extras=self.extras - ) - trace.debug(f"[NESTED]: Execute Branch: {branch!r}") - - # NOTE: Create nested-context + trace.info(f"[NESTED]: Execute Branch: {branch!r}") current_context: DictData = copy.deepcopy(params) current_context.update({"branch": branch}) - nestet_context: DictData = {"branch": branch, "stages": {}} + nestet_context: ParallelContext = {"branch": branch, "stages": {}} total_stage: int = len(self.parallel[branch]) skips: list[bool] = [False] * total_stage @@ -2160,8 +2245,7 @@ def _process_branch( if event and event.is_set(): error_msg: str = ( - "Branch execution was canceled from the event before " - "start branch execution." + f"Cancel branch: {branch!r} before start nested process." ) catch( context=context, @@ -2181,12 +2265,12 @@ def _process_branch( rs: Result = stage.execute( params=current_context, - run_id=parent_run_id, + run_id=trace.parent_run_id, event=event, ) - stage.set_outputs(rs.context, to=nestet_context) + stage.set_outputs(rs.context, to=cast_dict(nestet_context)) stage.set_outputs( - stage.get_outputs(nestet_context), to=current_context + stage.get_outputs(cast_dict(nestet_context)), to=current_context ) if rs.status == SKIP: @@ -2195,7 +2279,7 @@ def _process_branch( elif rs.status == FAILED: # pragma: no cov error_msg: str = ( - f"Branch execution was break because its nested-stage, " + f"Break branch: {branch!r} because nested stage: " f"{stage.iden!r}, failed." ) catch( @@ -2216,8 +2300,7 @@ def _process_branch( elif rs.status == CANCEL: error_msg: str = ( - "Branch execution was canceled from the event after " - "end branch execution." + f"Cancel branch: {branch!r} after end nested process." ) catch( context=context, @@ -2257,7 +2340,9 @@ def process( parent_run_id: Optional[str] = None, event: Optional[Event] = None, ) -> Result: - """Execute parallel each branch via multi-threading pool. + """Execute parallel each branch via multi-threading pool. The parallel + process will use all-completed strategy to handle result from each + branch. Args: params: A parameter data that want to use in this @@ -2268,6 +2353,9 @@ def process( event: An event manager that use to track parent process was not force stopped. + Raises: + StageCancelError: If event was set before start parallel process. + Returns: Result: The execution result with status and context data. """ @@ -2275,27 +2363,36 @@ def process( run_id, parent_run_id=parent_run_id, extras=self.extras ) event: Event = event or Event() - trace.info(f"[NESTED]: Parallel with {self.max_workers} workers.") + + # NOTE: Start prepare max_workers field if it is string type. + if isinstance(self.max_workers, str): + max_workers: int = self.__validate_max_workers( + pass_env( + param2template( + self.max_workers, params=params, extras=self.extras + ) + ) + ) + else: + max_workers: int = self.max_workers + trace.info(f"[NESTED]: Parallel with {max_workers} workers.") catch( context=context, status=WAIT, - updated={"workers": self.max_workers, "parallel": {}}, + updated={"workers": max_workers, "parallel": {}}, ) len_parallel: int = len(self.parallel) if event and event.is_set(): - raise StageCancelError( - "Execution was canceled from the event before start parallel." - ) + raise StageCancelError("Cancel before start parallel process.") - with ThreadPoolExecutor(self.max_workers, "stp") as executor: + with ThreadPoolExecutor(max_workers, "stp") as executor: futures: list[Future] = [ executor.submit( - self._process_branch, + self._process_nested, branch=branch, params=params, - run_id=run_id, + trace=trace, context=context, - parent_run_id=parent_run_id, event=event, ) for branch in self.parallel @@ -2310,15 +2407,21 @@ def process( self.mark_errors(errors, e) st: Status = validate_statuses(statuses) - return Result( - run_id=run_id, - parent_run_id=parent_run_id, + return Result.from_trace(trace).catch( status=st, context=catch(context, status=st, updated=errors), - extras=self.extras, ) +EachType = Union[ + list[str], + list[int], + str, + dict[str, Any], + dict[int, Any], +] + + class ForEachStage(BaseNestedStage): """For-Each stage executor that execute all stages with each item in the foreach list. @@ -2339,13 +2442,7 @@ class ForEachStage(BaseNestedStage): ... } """ - foreach: Union[ - list[str], - list[int], - str, - dict[str, Any], - dict[int, Any], - ] = Field( + foreach: EachType = Field( description=( "A items for passing to stages via ${{ item }} template parameter." ), @@ -2374,15 +2471,14 @@ class ForEachStage(BaseNestedStage): ), ) - def _process_item( + def _process_nested( self, index: int, item: StrOrInt, params: DictData, - run_id: str, + trace: Trace, context: DictData, *, - parent_run_id: Optional[str] = None, event: Optional[Event] = None, ) -> tuple[Status, DictData]: """Execute item that will execute all nested-stage that was set in this @@ -2391,32 +2487,29 @@ def _process_item( This method will create the nested-context from an input context data and use it instead the context data. - :param index: (int) An index value of foreach loop. - :param item: (str | int) An item that want to execution. - :param params: (DictData) A parameter data. - :param run_id: (str) - :param context: (DictData) - :param parent_run_id: (str | None) - :param event: (Event) An Event manager instance that use to cancel this - execution if it forces stopped by parent execution. - (Default is None) + Args: + index: (int) An index value of foreach loop. + item: (str | int) An item that want to execution. + params: (DictData) A parameter data. + trace (Trace): A Trace model. + context: (DictData) + event: (Event) An Event manager instance that use to cancel this + execution if it forces stopped by parent execution. + (Default is None) This method should raise error when it wants to stop the foreach loop such as cancel event or getting the failed status. - :raise StageCancelError: If event was set. - :raise StageError: If the stage execution raise any Exception error. - :raise StageError: If the result from execution has `FAILED` status. + Raises: + StageCancelError: If event was set. + StageError: If the stage execution raise any Exception error. + StageError: If the result from execution has `FAILED` status. - :rtype: tuple[Status, Result] + Returns: + tuple[Status, DictData] """ - trace: Trace = get_trace( - run_id, parent_run_id=parent_run_id, extras=self.extras - ) - trace.debug(f"[NESTED]: Execute Item: {item!r}") + trace.info(f"[NESTED]: Execute Item: {item!r}") key: StrOrInt = index if self.use_index_as_key else item - - # NOTE: Create nested-context data from the passing context. current_context: DictData = copy.deepcopy(params) current_context.update({"item": item, "loop": index}) nestet_context: DictData = {"item": item, "stages": {}} @@ -2430,8 +2523,7 @@ def _process_item( if event and event.is_set(): error_msg: str = ( - "Item execution was canceled from the event before start " - "item execution." + f"Cancel item: {key!r} before start nested process." ) catch( context=context, @@ -2449,10 +2541,9 @@ def _process_item( ) raise StageCancelError(error_msg, refs=key) - # NOTE: Nested-stage execute will pass only params and context only. rs: Result = stage.execute( params=current_context, - run_id=parent_run_id, + run_id=trace.parent_run_id, event=event, ) stage.set_outputs(rs.context, to=nestet_context) @@ -2466,7 +2557,7 @@ def _process_item( elif rs.status == FAILED: # pragma: no cov error_msg: str = ( - f"Item execution was break because its nested-stage, " + f"Break item: {key!r} because nested stage: " f"{stage.iden!r}, failed." ) trace.warning(f"[NESTED]: {error_msg}") @@ -2488,8 +2579,7 @@ def _process_item( elif rs.status == CANCEL: error_msg: str = ( - "Item execution was canceled from the event after " - "end item execution." + f"Cancel item: {key!r} after end nested process." ) catch( context=context, @@ -2520,6 +2610,42 @@ def _process_item( }, ) + def validate_foreach(self, value: Any) -> list[Any]: + """Validate foreach value that already passed to this model. + + Args: + value: + + Raises: + TypeError: If value can not try-convert to list type. + ValueError: + + Returns: + list[Any]: list of item. + """ + if isinstance(value, str): + try: + value: list[Any] = str2list(value) + except ValueError as e: + raise TypeError( + f"Does not support string foreach: {value!r} that can " + f"not convert to list." + ) from e + # [VALIDATE]: Type of the foreach should be `list` type. + elif isinstance(value, dict): + raise TypeError( + f"Does not support dict foreach: {value!r} ({type(value)}) " + f"yet." + ) + # [Validate]: Value in the foreach item should not be duplicate when the + # `use_index_as_key` field did not set. + elif len(set(value)) != len(value) and not self.use_index_as_key: + raise ValueError( + "Foreach item should not duplicate. If this stage must to pass " + "duplicate item, it should set `use_index_as_key: true`." + ) + return value + def process( self, params: DictData, @@ -2551,34 +2677,8 @@ def process( run_id, parent_run_id=parent_run_id, extras=self.extras ) event: Event = event or Event() - foreach: Union[list[str], list[int], str] = pass_env( - param2template(self.foreach, params, extras=self.extras) - ) - - # [NOTE]: Force convert str to list. - if isinstance(foreach, str): - try: - foreach: list[Any] = str2list(foreach) - except ValueError as e: - raise TypeError( - f"Does not support string foreach: {foreach!r} that can " - f"not convert to list." - ) from e - - # [VALIDATE]: Type of the foreach should be `list` type. - elif isinstance(foreach, dict): - raise TypeError( - f"Does not support dict foreach: {foreach!r} ({type(foreach)}) " - f"yet." - ) - # [Validate]: Value in the foreach item should not be duplicate when the - # `use_index_as_key` field did not set. - elif len(set(foreach)) != len(foreach) and not self.use_index_as_key: - raise ValueError( - "Foreach item should not duplicate. If this stage must to pass " - "duplicate item, it should set `use_index_as_key: true`." - ) - + foreach: EachType = self.pass_template(self.foreach, params=params) + foreach: list[Any] = self.validate_foreach(foreach) trace.info(f"[NESTED]: Foreach: {foreach!r}.") catch( context=context, @@ -2587,28 +2687,24 @@ def process( ) len_foreach: int = len(foreach) if event and event.is_set(): - raise StageCancelError( - "Execution was canceled from the event before start foreach." - ) + raise StageCancelError("Cancel before start foreach process.") with ThreadPoolExecutor(self.concurrent, "stf") as executor: futures: list[Future] = [ executor.submit( - self._process_item, - index=i, + self._process_nested, + index=index, item=item, params=params, - run_id=run_id, + trace=trace, context=context, - parent_run_id=parent_run_id, event=event, ) - for i, item in enumerate(foreach, start=0) + for index, item in enumerate(foreach, start=0) ] errors: DictData = {} statuses: list[Status] = [WAIT] * len_foreach - fail_fast: bool = False done, not_done = wait(futures, return_when=FIRST_EXCEPTION) if len(list(done)) != len(futures): @@ -2619,7 +2715,7 @@ def process( for future in not_done: future.cancel() - time.sleep(0.01) # Reduced from 0.025 for better responsiveness + time.sleep(0.025) nd: str = ( ( f", {len(not_done)} item" @@ -2630,7 +2726,6 @@ def process( ) trace.debug(f"[NESTED]: ... Foreach-Stage set failed event{nd}") done: Iterator[Future] = as_completed(futures) - fail_fast = True for i, future in enumerate(done, start=0): try: @@ -2640,21 +2735,13 @@ def process( statuses[i] = get_status_from_error(e) self.mark_errors(errors, e) except CancelledError: + statuses[i] = CANCEL pass status: Status = validate_statuses(statuses) - - # NOTE: Prepare status because it does not cancel from parent event but - # cancel from failed item execution. - if fail_fast and status == CANCEL: - status = FAILED - - return Result( - run_id=run_id, - parent_run_id=parent_run_id, + return Result.from_trace(trace).catch( status=status, context=catch(context, status=status, updated=errors), - extras=self.extras, ) @@ -2689,7 +2776,7 @@ class UntilStage(BaseNestedStage): ), ) until: str = Field(description="A until condition for stop the while loop.") - stages: list[Stage] = Field( + stages: list[NestedStage] = Field( default_factory=list, description=( "A list of stage that will run with each item in until loop." @@ -2706,38 +2793,33 @@ class UntilStage(BaseNestedStage): alias="max-loop", ) - def _process_loop( + def _process_nested( self, item: T, loop: int, params: DictData, - run_id: str, + trace: Trace, context: DictData, *, - parent_run_id: Optional[str] = None, event: Optional[Event] = None, ) -> tuple[Status, DictData, T]: """Execute loop that will execute all nested-stage that was set in this stage with specific loop and item. - :param item: (T) An item that want to execution. - :param loop: (int) A number of loop. - :param params: (DictData) A parameter data. - :param run_id: (str) - :param context: (DictData) - :param parent_run_id: (str | None) - :param event: (Event) An Event manager instance that use to cancel this - execution if it forces stopped by parent execution. + Args: + item: (T) An item that want to execution. + loop: (int) A number of loop. + params: (DictData) A parameter data. + trace: (Trace) + context: (DictData) + event: (Event) An Event manager instance that use to cancel this + execution if it forces stopped by parent execution. - :rtype: tuple[Status, DictData, T] - :return: Return a pair of Result and changed item. + Returns: + tuple[Status, DictData, T]: Return a pair of Result and changed + item. """ - trace: Trace = get_trace( - run_id, parent_run_id=parent_run_id, extras=self.extras - ) trace.debug(f"[NESTED]: Execute Loop: {loop} (Item {item!r})") - - # NOTE: Create nested-context current_context: DictData = copy.deepcopy(params) current_context.update({"item": item, "loop": loop}) nestet_context: DictData = {"loop": loop, "item": item, "stages": {}} @@ -2752,8 +2834,7 @@ def _process_loop( if event and event.is_set(): error_msg: str = ( - "Loop execution was canceled from the event before start " - "loop execution." + f"Cancel loop: {i!r} before start nested process." ) catch( context=context, @@ -2774,7 +2855,7 @@ def _process_loop( rs: Result = stage.execute( params=current_context, - run_id=parent_run_id, + run_id=trace.parent_run_id, event=event, ) stage.set_outputs(rs.context, to=nestet_context) @@ -2790,8 +2871,8 @@ def _process_loop( elif rs.status == FAILED: error_msg: str = ( - f"Loop execution was break because its nested-stage, " - f"{stage.iden!r}, failed." + f"Break loop: {i!r} because nested stage: {stage.iden!r}, " + f"failed." ) catch( context=context, @@ -2811,10 +2892,7 @@ def _process_loop( raise StageNestedError(error_msg, refs=loop) elif rs.status == CANCEL: - error_msg: str = ( - "Loop execution was canceled from the event after " - "end loop execution." - ) + error_msg: str = f"Cancel loop: {i!r} after end nested process." catch( context=context, status=CANCEL, @@ -2881,35 +2959,33 @@ def process( ) event: Event = event or Event() trace.info(f"[NESTED]: Until: {self.until!r}") - item: Union[str, int, bool] = pass_env( - param2template(self.item, params, extras=self.extras) - ) + item: Union[str, int, bool] = self.pass_template(self.item, params) loop: int = 1 until_rs: bool = True exceed_loop: bool = False catch(context=context, status=WAIT, updated={"until": {}}) statuses: list[Status] = [] + while until_rs and not (exceed_loop := (loop > self.max_loop)): if event and event.is_set(): raise StageCancelError( - "Execution was canceled from the event before start loop." + f"Cancel before start loop process, (loop: {loop})." ) - status, context, item = self._process_loop( + status, context, item = self._process_nested( item=item, loop=loop, params=params, - run_id=run_id, + trace=trace, context=context, - parent_run_id=parent_run_id, event=event, ) loop += 1 if item is None: item: int = loop - trace.warning( + trace.debug( f"[NESTED]: Return loop not set the item. It uses loop: " f"{loop} by default." ) @@ -2960,6 +3036,13 @@ class Match(BaseModel): ) +class Else(BaseModel): + other: list[Stage] = Field( + description="A list of stage that does not match any case.", + alias="else", + ) + + class CaseStage(BaseNestedStage): """Case stage executor that execute all stages if the condition was matched. @@ -2989,10 +3072,34 @@ class CaseStage(BaseNestedStage): ... ], ... } + >>> stage = { + ... "name": "If stage execution.", + ... "case": "${{ param.test }}", + ... "match": [ + ... { + ... "case": "1", + ... "stages": [ + ... { + ... "name": "Stage case 1", + ... "eche": "Hello case 1", + ... }, + ... ], + ... }, + ... { + ... "else": [ + ... { + ... "name": "Stage else", + ... "eche": "Hello case else", + ... }, + ... ], + ... }, + ... ], + ... } + """ case: str = Field(description="A case condition for routing.") - match: list[Match] = Field( + match: list[Union[Match, Else]] = Field( description="A list of Match model that should not be an empty list.", ) skip_not_match: bool = Field( @@ -3004,46 +3111,117 @@ class CaseStage(BaseNestedStage): alias="skip-not-match", ) - def _process_case( + @field_validator("match", mode="after") + def __validate_match( + cls, match: list[Union[Match, Else]] + ) -> list[Union[Match, Else]]: + """Validate the match field should contain only one Else model.""" + c_else_case: int = 0 + c_else_model: int = 0 + for m in match: + if isinstance(m, Else): + if c_else_model: + raise ValueError( + "Match field should contain only one `Else` model." + ) + c_else_model += 1 + continue + if isinstance(m, Match) and m.case == "_": + if c_else_case: + raise ValueError( + "Match field should contain only one else, '_', case." + ) + c_else_case += 1 + continue + return match + + def extract_stages_from_case( + self, case: StrOrNone, params: DictData + ) -> tuple[StrOrNone, list[Stage]]: + """Extract stage from case. + + Args: + case (StrOrNone): + params (DictData): + + Returns: + tuple[StrOrNone, list[Stage]]: A pair of case and stages. + """ + _else_stages: Optional[list[Stage]] = None + stages: Optional[list[Stage]] = None + + # NOTE: Start check the condition of each stage match with this case. + for match in self.match: + + if isinstance(match, Else): + _else_stages: list[Stage] = match.other + continue + + # NOTE: Store the else case. + if (c := match.case) == "_": + _else_stages: list[Stage] = match.stages + continue + + _condition: str = param2template(c, params, extras=self.extras) + if pass_env(case) == pass_env(_condition): + stages: list[Stage] = match.stages + break + + if stages is not None: + return case, stages + + if _else_stages is None: + if not self.skip_not_match: + raise StageError( + "This stage does not set else for support not match " + "any case." + ) + raise StageSkipError( + "Execution was skipped because it does not match any " + "case and the else condition does not set too." + ) + + # NOTE: Force to use the else when it does not match any case. + return "_", _else_stages + + def _process_nested( self, case: str, stages: list[Stage], params: DictData, - run_id: str, + trace: Trace, context: DictData, *, - parent_run_id: Optional[str] = None, event: Optional[Event] = None, ) -> tuple[Status, DictData]: """Execute case. - :param case: (str) A case that want to execution. - :param stages: (list[Stage]) A list of stage. - :param params: (DictData) A parameter data. - :param run_id: (str) - :param context: (DictData) - :param parent_run_id: (str | None) - :param event: (Event) An Event manager instance that use to cancel this - execution if it forces stopped by parent execution. + Args: + case: (str) A case that want to execution. + stages: (list[Stage]) A list of stage. + params: (DictData) A parameter data. + trace: (Trace) + context: (DictData) + event: (Event) An Event manager instance that use to cancel this + execution if it forces stopped by parent execution. - :rtype: DictData + Returns: + DictData """ - trace: Trace = get_trace( - run_id, parent_run_id=parent_run_id, extras=self.extras - ) - trace.debug(f"[NESTED]: Execute Case: {case!r}") + trace.info(f"[NESTED]: Case: {case!r}") current_context: DictData = copy.deepcopy(params) current_context.update({"case": case}) output: DictData = {"case": case, "stages": {}} - for stage in stages: + total_stage: int = len(stages) + skips: list[bool] = [False] * total_stage + for i, stage in enumerate(stages, start=0): if self.extras: stage.extras = self.extras if event and event.is_set(): error_msg: str = ( - "Case-Stage was canceled from event that had set before " - "stage case execution." + f"Cancel case: {case!r} before start nested process." ) return CANCEL, catch( context=context, @@ -3057,16 +3235,20 @@ def _process_case( rs: Result = stage.execute( params=current_context, - run_id=parent_run_id, + run_id=trace.parent_run_id, event=event, ) stage.set_outputs(rs.context, to=output) stage.set_outputs(stage.get_outputs(output), to=current_context) - if rs.status == FAILED: + if rs.status == SKIP: + skips[i] = True + continue + + elif rs.status == FAILED: error_msg: str = ( - f"Case-Stage was break because it has a sub stage, " - f"{stage.iden}, failed without raise error." + f"Break case: {case!r} because nested stage: {stage.iden}, " + f"failed." ) return FAILED, catch( context=context, @@ -3077,9 +3259,25 @@ def _process_case( "errors": StageError(error_msg).to_dict(), }, ) - return SUCCESS, catch( + + elif rs.status == CANCEL: + error_msg: str = ( + f"Cancel case {case!r} after end nested process." + ) + return CANCEL, catch( + context=context, + status=CANCEL, + updated={ + "case": case, + "stages": filter_func(output.pop("stages", {})), + "errors": StageCancelError(error_msg).to_dict(), + }, + ) + + status: Status = SKIP if sum(skips) == total_stage else SUCCESS + return status, catch( context=context, - status=SUCCESS, + status=status, updated={ "case": case, "stages": filter_func(output.pop("stages", {})), @@ -3113,52 +3311,17 @@ def process( run_id, parent_run_id=parent_run_id, extras=self.extras ) - _case: StrOrNone = param2template(self.case, params, extras=self.extras) - trace.info(f"[NESTED]: Get Case: {_case!r}.") - - _else: Optional[Match] = None - stages: Optional[list[Stage]] = None - - # NOTE: Start check the condition of each stage match with this case. - for match in self.match: - # NOTE: Store the else case. - if (c := match.case) == "_": - _else: Match = match - continue - - _condition: str = param2template(c, params, extras=self.extras) - if pass_env(_case) == pass_env(_condition): - stages: list[Stage] = match.stages - break - - if stages is None: - if _else is None: - if not self.skip_not_match: - raise StageError( - "This stage does not set else for support not match " - "any case." - ) - raise StageSkipError( - "Execution was skipped because it does not match any " - "case and the else condition does not set too." - ) - - # NOTE: Force to use the else when it does not match any case. - _case: str = "_" - stages: list[Stage] = _else.stages - + case: StrOrNone = param2template(self.case, params, extras=self.extras) + trace.info(f"[NESTED]: Get Case: {case!r}.") + case, stages = self.extract_stages_from_case(case, params=params) if event and event.is_set(): - raise StageCancelError( - "Execution was canceled from the event before start " - "case execution." - ) - status, context = self._process_case( - case=_case, + raise StageCancelError("Cancel before start case process.") + status, context = self._process_nested( + case=case, stages=stages, params=params, - run_id=run_id, + trace=trace, context=context, - parent_run_id=parent_run_id, event=event, ) return Result( @@ -3584,6 +3747,59 @@ def process( extras=self.extras, ) + async def async_process( + self, + params: DictData, + run_id: str, + context: DictData, + *, + parent_run_id: Optional[str] = None, + event: Optional[Event] = None, + ) -> Result: + raise NotImplementedError( + "Async process of Virtual Python stage does not implement yet." + ) + + +NestedStage = Annotated[ + Union[ + BashStage, + CallStage, + PyStage, + VirtualPyStage, + RaiseStage, + DockerStage, + TriggerStage, + EmptyStage, + CaseStage, + ForEachStage, + UntilStage, + ], + Field( + union_mode="smart", + description="A nested-stage allow list", + ), +] # pragma: no cov + + +ActionStage = Annotated[ + Union[ + BashStage, + CallStage, + VirtualPyStage, + PyStage, + RaiseStage, + DockerStage, + EmptyStage, + ], + Field( + union_mode="smart", + description=( + "An action stage model that allow to use with nested-stage model." + ), + ), +] # pragma: no cov + # NOTE: # An order of parsing stage model on the Job model with `stages` field. @@ -3592,18 +3808,14 @@ def process( # Stage = Annotated[ Union[ - DockerStage, - BashStage, - CallStage, - TriggerStage, + # NOTE: Nested Stage. ForEachStage, UntilStage, ParallelStage, CaseStage, - VirtualPyStage, - PyStage, - RaiseStage, - EmptyStage, + TriggerStage, + # NOTE: Union with the action stage. + ActionStage, ], Field( union_mode="smart", diff --git a/src/ddeutil/workflow/traces.py b/src/ddeutil/workflow/traces.py index b7ce2fc5..1cb3a592 100644 --- a/src/ddeutil/workflow/traces.py +++ b/src/ddeutil/workflow/traces.py @@ -51,6 +51,8 @@ logger = logging.getLogger("ddeutil.workflow") Level = Literal["debug", "info", "warning", "error", "exception"] +EMJ_ALERT: str = "🚨" +EMJ_SKIP: str = "⏭️" @lru_cache @@ -239,7 +241,7 @@ class Metadata(BaseModel): # pragma: no cov default=None, description="Environment (dev, staging, prod)." ) - # System context + # NOTE: System context hostname: Optional[str] = Field( default=None, description="Hostname where workflow is running." ) @@ -253,7 +255,7 @@ class Metadata(BaseModel): # pragma: no cov default=None, description="Workflow package version." ) - # Custom metadata + # NOTE: Custom metadata tags: Optional[list[str]] = Field( default_factory=list, description="Custom tags for categorization." ) @@ -320,6 +322,8 @@ def make( import socket import sys + from .__about__ import __version__ + frame: Optional[FrameType] = currentframe() if frame is None: raise ValueError("Cannot get current frame") @@ -384,7 +388,7 @@ def make( hostname=hostname, ip_address=ip_address, python_version=python_version, - package_version=extras_data.get("package_version"), + package_version=__version__, # NOTE: Custom metadata tags=extras_data.get("tags", []), metadata=extras_data.get("metadata", {}), @@ -2046,7 +2050,7 @@ def get_trace( Args: run_id (str): A running ID. parent_run_id (str | None, default None): A parent running ID. - handlers: + handlers (list): extras: An extra parameter that want to override the core config values. auto_pre_process (bool, default False) @@ -2057,7 +2061,7 @@ def get_trace( handlers: list[DictData] = dynamic( "trace_handlers", f=handlers, extras=extras ) - trace = Trace.model_validate( + trace: Trace = Trace.model_validate( { "run_id": run_id, "parent_run_id": parent_run_id, diff --git a/src/ddeutil/workflow/utils.py b/src/ddeutil/workflow/utils.py index f20901a7..f7d3ebf1 100644 --- a/src/ddeutil/workflow/utils.py +++ b/src/ddeutil/workflow/utils.py @@ -8,26 +8,6 @@ This module provides essential utility functions used throughout the workflow system for ID generation, datetime handling, string processing, template operations, and other common tasks. - -Functions: - to_train: Convert camel case strings to train case format - prepare_newline: Format messages with multiple newlines - replace_sec: Replace seconds and microseconds in datetime objects - clear_tz: Clear timezone info from datetime objects - get_dt_now: Get current datetime with timezone - get_d_now: Get current date - get_diff_sec: Calculate time difference in seconds - reach_next_minute: Check if datetime reaches next minute - wait_until_next_minute: Wait until next minute - delay: Add random delay to execution - gen_id: Generate unique identifiers for workflow components - default_gen_id: Generate default running ID - make_exec: Make files executable - filter_func: Filter function objects from data structures - cross_product: Generate cross product of matrix values - cut_id: Cut running ID to specified length - dump_all: Serialize nested BaseModel objects to dictionaries - obj_name: Get object name or class name """ from __future__ import annotations @@ -253,6 +233,33 @@ def gen_id( ).hexdigest() +def extract_id( + name: str, + run_id: Optional[str] = None, + extras: Optional[DictData] = None, +) -> tuple[str, str]: + """Extract the parent ID and running ID. If the `run_id` parameter was + passed, it will replace the parent_run_id with this value and re-generate + new running ID for it instead. + + Args: + name (str): A name for generate hashing value for the `gen_id` function. + run_id (str | None, default None): + extras: + + Returns: + tuple[str, str]: A pair of parent running ID and running ID. + """ + generated = gen_id(name, unique=True, extras=extras) + if run_id: + parent_run_id: str = run_id + run_id: str = generated + else: + run_id: str = generated + parent_run_id: str = run_id + return parent_run_id, run_id + + def default_gen_id() -> str: """Return running ID for making default ID for the Result model. @@ -327,6 +334,8 @@ def cut_id(run_id: str, *, num: int = 8) -> str: Example: >>> cut_id(run_id='20240101081330000000T1354680202') '202401010813680202' + >>> cut_id(run_id='20240101081330000000T1354680202') + '54680202' Args: run_id: A running ID to cut. @@ -394,3 +403,8 @@ def obj_name(obj: Optional[Union[str, object]] = None) -> Optional[str]: else: obj_type: str = obj.__class__.__name__ return obj_type + + +def remove_sys_extras(extras: DictData) -> DictData: + """Remove key that starts with `__sys_` from the extra dict parameter.""" + return {k: extras[k] for k in extras if not k.startswith("__sys_")} diff --git a/src/ddeutil/workflow/workflow.py b/src/ddeutil/workflow/workflow.py index ca197e68..6efdb5a4 100644 --- a/src/ddeutil/workflow/workflow.py +++ b/src/ddeutil/workflow/workflow.py @@ -37,6 +37,7 @@ from typing import Any, Literal, Optional, Union from pydantic import BaseModel, Field +from pydantic.functional_serializers import field_serializer from pydantic.functional_validators import field_validator, model_validator from typing_extensions import Self @@ -62,8 +63,10 @@ from .reusables import has_template, param2template from .traces import Trace, get_trace from .utils import ( + extract_id, gen_id, get_dt_now, + remove_sys_extras, ) @@ -241,8 +244,15 @@ def __validate_jobs_need__(self) -> Self: f"{self.name!r}." ) + # NOTE: Force update internal extras for handler circle execution. + self.extras.update({"__sys_break_circle_exec": self.name}) + return self + @field_serializer("extras") + def __serialize_extras(self, extras: DictData) -> DictData: + return remove_sys_extras(extras) + def detail(self) -> DictData: # pragma: no cov """Return the detail of this workflow for generate markdown.""" return self.model_dump(by_alias=True) @@ -255,7 +265,8 @@ def md(self, author: Optional[str] = None) -> str: # pragma: no cov """ def align_newline(value: str) -> str: - return value.rstrip("\n").replace("\n", "\n ") + space: str = " " * 16 + return value.rstrip("\n").replace("\n", f"\n{space}") info: str = ( f"| Author: {author or 'nobody'} " @@ -282,8 +293,7 @@ def align_newline(value: str) -> str: {align_newline(self.desc)}\n ## Parameters\n | name | type | default | description | - | --- | --- | --- | : --- : | - + | --- | --- | --- | : --- : |\n\n ## Jobs\n {align_newline(jobs)} """.lstrip( @@ -312,8 +322,7 @@ def job(self, name: str) -> Job: f"{self.name!r}" ) job: Job = self.jobs[name] - if self.extras: - job.extras = self.extras + job.extras = self.extras return job def parameterize(self, params: DictData) -> DictData: @@ -332,8 +341,8 @@ def parameterize(self, params: DictData) -> DictData: execute method. Returns: - DictData: The parameter value that validate with its parameter fields and - adding jobs key to this parameter. + DictData: The parameter value that validate with its parameter fields + and adding jobs key to this parameter. Raises: WorkflowError: If parameter value that want to validate does @@ -393,11 +402,12 @@ def release( Args: release (datetime): A release datetime. - params: A workflow parameter that pass to execute method. - release_type: + params (DictData): A workflow parameter that pass to execute method. + release_type (ReleaseType): A release type that want to execute. run_id: (str) A workflow running ID. runs_metadata: (DictData) - audit: An audit class that want to save the execution result. + audit (Audit): An audit model that use to manage release log of this + execution. override_log_name: (str) An override logging name that use instead the workflow name. timeout: (int) A workflow execution time out in second unit. @@ -412,13 +422,9 @@ def release( audit: Audit = audit or get_audit(extras=self.extras) # NOTE: Generate the parent running ID with not None value. - if run_id: - parent_run_id: str = run_id - run_id: str = gen_id(name, unique=True) - else: - run_id: str = gen_id(name, unique=True) - parent_run_id: str = run_id - + parent_run_id, run_id = extract_id( + name, run_id=run_id, extras=self.extras + ) context: DictData = {"status": WAIT} audit_data: DictData = { "name": name, @@ -507,7 +513,7 @@ def release( **(context["errors"] if "errors" in context else {}), }, ), - extras=self.extras, + extras=remove_sys_extras(self.extras), ) def execute_job( @@ -655,8 +661,9 @@ def execute( :rtype: Result """ ts: float = time.monotonic() - parent_run_id: Optional[str] = run_id - run_id: str = gen_id(self.name, unique=True, extras=self.extras) + parent_run_id, run_id = extract_id( + self.name, run_id=run_id, extras=self.extras + ) trace: Trace = get_trace( run_id, parent_run_id=parent_run_id, extras=self.extras ) @@ -884,6 +891,10 @@ def rerun( ) -> Result: # pragma: no cov """Re-Execute workflow with passing the error context data. + Warnings: + This rerun method allow to rerun job execution level only. That mean + it does not support rerun only stage. + Args: context: A context result that get the failed status. run_id: (Optional[str]) A workflow running ID. diff --git a/tests/conf/context/jobs_failed.json b/tests/conf/context/jobs_failed.json new file mode 100644 index 00000000..ee56b45e --- /dev/null +++ b/tests/conf/context/jobs_failed.json @@ -0,0 +1,21 @@ +{ + "stages": { + "first-stage": { + "outputs": { + "foo": "bar" + }, + "status": "failed", + "errors": { + "name": "StageError", + "message": "This is the example if it has some error raise from stage." + } + } + }, + "status": "failed", + "error": { + "first-stage": { + "name": "JobError", + "message": "Job got the stage error after exec." + } + } +} diff --git a/tests/conf/context/jobs_strategies_failed.json b/tests/conf/context/jobs_strategies_failed.json new file mode 100644 index 00000000..861ecafd --- /dev/null +++ b/tests/conf/context/jobs_strategies_failed.json @@ -0,0 +1,32 @@ +{ + "strategies": { + "1001001": { + "stages": { + "first-stage": { + "outputs": { + "foo": "bar" + }, + "status": "failed", + "errors": { + "name": "StageError", + "message": "This is the example if it has some error raise from stage." + } + } + }, + "status": "failed", + "error": { + "first-stage": { + "name": "JobError", + "message": "Job got the stage error after exec." + } + } + } + }, + "status": "failed", + "error": { + "1001001": { + "name": "JobError", + "message": "Job strategy failed", + } + } +} diff --git a/tests/conf/context/stages_failed.json b/tests/conf/context/stages_failed.json new file mode 100644 index 00000000..558882c8 --- /dev/null +++ b/tests/conf/context/stages_failed.json @@ -0,0 +1,10 @@ +{ + "outputs": { + "foo": "bar" + }, + "status": "failed", + "errors": { + "name": "StageError", + "message": "This is the example if it has some error raise from stage." + } +} diff --git a/tests/conf/context/workflow_failed.json b/tests/conf/context/workflow_failed.json new file mode 100644 index 00000000..22346c81 --- /dev/null +++ b/tests/conf/context/workflow_failed.json @@ -0,0 +1,32 @@ +{ + "jobs": { + "first-job": { + "stages": { + "first-stage": { + "outputs": { + "foo": "bar" + }, + "status": "failed", + "errors": { + "name": "StageError", + "message": "This is the example if it has some error raise from stage." + } + } + }, + "status": "failed", + "error": { + "first-stage": { + "name": "JobError", + "message": "Job got the stage error after exec." + } + } + }, + "status": "failed", + "error": { + "first-job": { + "name": "WorkflowError", + "message": "Workflow got the job error after exec." + } + } + } +} diff --git a/tests/conf/demo/01_99_wf_test_wf_exec_circle.yml b/tests/conf/demo/01_99_wf_test_wf_exec_circle.yml deleted file mode 100644 index d68262b5..00000000 --- a/tests/conf/demo/01_99_wf_test_wf_exec_circle.yml +++ /dev/null @@ -1,7 +0,0 @@ -wf-circle: - type: Workflow - jobs: - first-job: - stages: - - name: "Trigger itself" - trigger: wf-circle diff --git a/tests/context/__init__.py b/tests/context/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/context/conftest.py b/tests/context/conftest.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/context/core.py b/tests/context/core.py new file mode 100644 index 00000000..e16739a9 --- /dev/null +++ b/tests/context/core.py @@ -0,0 +1,52 @@ +import time +from typing import Any, TypedDict, cast + +import pytest +from ddeutil.workflow import SUCCESS, WAIT, Result, Status +from typing_extensions import NotRequired + + +class Context(TypedDict): + params: dict[str, Any] + status: Status + jobs: dict[str, Any] + errors: NotRequired[dict[str, Any]] + + +class StageContext(TypedDict): + status: Status + context: NotRequired[dict[str, Any]] + + +def stage_exec(params: Context, run_id: str) -> Result: + stage_id = "stage-01" + context: StageContext = {"status": WAIT} + print(context) + time.sleep(0.25) + parent_run_id = run_id + run_id = f"1001{parent_run_id}" + params["jobs"].update({stage_id: {"outputs": {"records": 10}}}) + return Result( + run_id=run_id, + parent_run_id=parent_run_id, + status=SUCCESS, + context=cast(dict, params), + ) + + +@pytest.fixture(scope="function") +def params() -> Context: + return { + "params": {"foo": "bar"}, + "jobs": {}, + "status": WAIT, + } + + +def test_stage_context(params): + print() + rs = stage_exec(params=params, run_id="A") + print(rs) + + params["jobs"].update({"update": "baz"}) + print(rs) diff --git a/tests/example/test_stage_foreach.py b/tests/example/test_stage_foreach.py index 86d7896f..a011057a 100644 --- a/tests/example/test_stage_foreach.py +++ b/tests/example/test_stage_foreach.py @@ -140,9 +140,30 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): "item": 1, "stages": { "2827845371": { - "outputs": {}, + "outputs": { + "params": {"item": 1}, + "jobs": { + "first-job": { + "status": FAILED, + "stages": { + "raise-stage": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Raise trigger with item: 1", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'raise-stage', failed.", + }, + } + }, + }, "errors": { - "name": "StageNestedError", + "name": "StageError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, @@ -150,7 +171,7 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): }, "errors": { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Stage trigger for raise', failed.", + "message": "Break item: 1 because nested stage: 'Stage trigger for raise', failed.", }, }, 2: { @@ -158,9 +179,9 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): "item": 2, "stages": { "2827845371": { - "outputs": {}, + "outputs": {"params": {"item": 2}, "jobs": {}}, "errors": { - "name": "StageNestedCancelError", + "name": "StageCancelError", "message": "Trigger workflow was cancel.", }, "status": CANCEL, @@ -168,18 +189,18 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): }, "errors": { "name": "StageCancelError", - "message": "Item execution was canceled from the event after end item execution.", + "message": "Cancel item: 2 after end nested process.", }, }, }, "errors": { - 1: { - "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Stage trigger for raise', failed.", - }, 2: { "name": "StageCancelError", - "message": "Item execution was canceled from the event after end item execution.", + "message": "Cancel item: 2 after end nested process.", + }, + 1: { + "name": "StageError", + "message": "Break item: 1 because nested stage: 'Stage trigger for raise', failed.", }, }, } @@ -197,9 +218,30 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): "item": 1, "stages": { "2827845371": { - "outputs": {}, + "outputs": { + "params": {"item": 1}, + "jobs": { + "first-job": { + "status": FAILED, + "stages": { + "raise-stage": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Raise trigger with item: 1", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'raise-stage', failed.", + }, + } + }, + }, "errors": { - "name": "StageNestedError", + "name": "StageError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, @@ -207,7 +249,7 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): }, "errors": { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Stage trigger for raise', failed.", + "message": "Break item: 1 because nested stage: 'Stage trigger for raise', failed.", }, }, 2: { @@ -216,18 +258,18 @@ def test_example_foreach_stage_exec_with_trigger_raise(test_path): "stages": {}, "errors": { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 2 before start nested process.", }, }, }, "errors": { 2: { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 2 before start nested process.", }, 1: { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Stage trigger for raise', failed.", + "message": "Break item: 1 because nested stage: 'Stage trigger for raise', failed.", }, }, } diff --git a/tests/example/test_stage_parallel.py b/tests/example/test_stage_parallel.py index 5f1db2ab..f8f89d32 100644 --- a/tests/example/test_stage_parallel.py +++ b/tests/example/test_stage_parallel.py @@ -127,7 +127,7 @@ def test_example_parallel_stage_exec_with_trigger_raise(test_path): }, "errors": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Raise Stage', failed.", + "message": "Break branch: 'branch02' because nested stage: 'Raise Stage', failed.", }, }, "branch01": { @@ -135,9 +135,30 @@ def test_example_parallel_stage_exec_with_trigger_raise(test_path): "branch": "branch01", "stages": { "8713259197": { - "outputs": {}, + "outputs": { + "params": {"branch": "branch01"}, + "jobs": { + "first-job": { + "status": FAILED, + "stages": { + "raise-stage": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Raise trigger with branch: branch01", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'raise-stage', failed.", + }, + } + }, + }, "errors": { - "name": "StageNestedError", + "name": "StageError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, @@ -145,18 +166,18 @@ def test_example_parallel_stage_exec_with_trigger_raise(test_path): }, "errors": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Stage trigger', failed.", + "message": "Break branch: 'branch01' because nested stage: 'Stage trigger', failed.", }, }, }, "errors": { "branch02": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Raise Stage', failed.", + "message": "Break branch: 'branch02' because nested stage: 'Raise Stage', failed.", }, "branch01": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Stage trigger', failed.", + "message": "Break branch: 'branch01' because nested stage: 'Stage trigger', failed.", }, }, } @@ -211,9 +232,30 @@ def test_example_parallel_stage_exec_with_trigger_raise_bug(test_path): "branch": "branch01", "stages": { "2579951921": { - "outputs": {}, + "outputs": { + "params": {"branch": "branch01"}, + "jobs": { + "first-job": { + "status": FAILED, + "stages": { + "raise-stage": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Raise trigger with branch: branch01", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'raise-stage', failed.", + }, + } + }, + }, "errors": { - "name": "StageNestedError", + "name": "StageError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, @@ -221,7 +263,7 @@ def test_example_parallel_stage_exec_with_trigger_raise_bug(test_path): }, "errors": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Stage trigger 1', failed.", + "message": "Break branch: 'branch01' because nested stage: 'Stage trigger 1', failed.", }, }, "branch02": { @@ -229,9 +271,30 @@ def test_example_parallel_stage_exec_with_trigger_raise_bug(test_path): "branch": "branch02", "stages": { "4773288548": { - "outputs": {}, + "outputs": { + "params": {"branch": "branch02"}, + "jobs": { + "first-job": { + "status": FAILED, + "stages": { + "raise-stage": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Raise trigger with branch: branch02", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'raise-stage', failed.", + }, + } + }, + }, "errors": { - "name": "StageNestedError", + "name": "StageError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, "status": FAILED, @@ -239,18 +302,18 @@ def test_example_parallel_stage_exec_with_trigger_raise_bug(test_path): }, "errors": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Stage trigger 2', failed.", + "message": "Break branch: 'branch02' because nested stage: 'Stage trigger 2', failed.", }, }, }, "errors": { "branch01": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Stage trigger 1', failed.", + "message": "Break branch: 'branch01' because nested stage: 'Stage trigger 1', failed.", }, "branch02": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Stage trigger 2', failed.", + "message": "Break branch: 'branch02' because nested stage: 'Stage trigger 2', failed.", }, }, } diff --git a/tests/stages/test_stage_call.py b/tests/stages/test_stage_call.py index b446d4a0..15d49e9e 100644 --- a/tests/stages/test_stage_call.py +++ b/tests/stages/test_stage_call.py @@ -240,7 +240,7 @@ def test_call_stage_exec_cancel(): "status": CANCEL, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start call process.", }, } @@ -396,7 +396,7 @@ async def test_call_stage_axec_cancel(): "status": CANCEL, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start call process.", }, } diff --git a/tests/stages/test_stage_case.py b/tests/stages/test_stage_case.py index 710b46ae..c7ca25ea 100644 --- a/tests/stages/test_stage_case.py +++ b/tests/stages/test_stage_case.py @@ -1,10 +1,72 @@ +import pytest from ddeutil.workflow import CANCEL, FAILED, SKIP, SUCCESS, Result from ddeutil.workflow.stages import CaseStage, Stage +from pydantic import ValidationError from ..utils import MockEvent -def test_case_stage_exec(test_path): +def test_case_stage_raise(): + # NOTE: Raise because it contains else case more than 1. + with pytest.raises(ValidationError): + CaseStage.model_validate( + { + "name": "Start run case-match stage", + "id": "case-stage", + "case": "${{ params.name }}", + "match": [ + { + "else": [ + { + "name": "Else stage", + "echo": "Not match any case.", + }, + ], + }, + { + "else": [ + { + "name": "Else stage", + "echo": "Not match any case.", + }, + ], + }, + ], + } + ) + + # NOTE: Raise because it contains else case more than 1. + with pytest.raises(ValidationError): + CaseStage.model_validate( + { + "name": "Start run case-match stage", + "id": "case-stage", + "case": "${{ params.name }}", + "match": [ + { + "case": "_", + "stages": [ + { + "name": "Else stage", + "echo": "Not match any case.", + }, + ], + }, + { + "case": "_", + "stages": [ + { + "name": "Else stage", + "echo": "Not match any case.", + }, + ], + }, + ], + } + ) + + +def test_case_stage_exec(): stage: Stage = CaseStage.model_validate( { "name": "Start run case-match stage", @@ -93,6 +155,39 @@ def test_case_stage_exec(test_path): } +def test_case_stage_exec_else(): + stage: Stage = CaseStage.model_validate( + { + "name": "Start run case-match stage", + "id": "case-stage", + "case": "${{ params.name }}", + "match": [ + { + "case": "bar", + "stages": [ + { + "name": "Match name with Bar", + "echo": "Hello ${{ params.name }}", + }, + ], + }, + { + "else": [ + {"name": "Else stage", "echo": "Not match any case."}, + ], + }, + ], + }, + ) + rs: Result = stage.execute({"params": {"name": "test"}}, run_id="03") + assert rs.status == SUCCESS + assert rs.context == { + "status": SUCCESS, + "case": "_", + "stages": {"5883888894": {"outputs": {}, "status": SUCCESS}}, + } + + def test_case_stage_exec_raise(): stage: Stage = CaseStage.model_validate( { @@ -159,7 +254,7 @@ def test_case_stage_exec_raise(): }, "errors": { "name": "StageError", - "message": "Case-Stage was break because it has a sub stage, Raise stage, failed without raise error.", + "message": "Break case: 'bar' because nested stage: Raise stage, failed.", }, } @@ -192,9 +287,7 @@ def test_case_stage_exec_cancel(): "status": CANCEL, "errors": { "name": "StageCancelError", - "message": ( - "Execution was canceled from the event before start case execution." - ), + "message": "Cancel before start case process.", }, } @@ -209,7 +302,31 @@ def test_case_stage_exec_cancel(): "stages": {}, "errors": { "name": "StageError", - "message": "Case-Stage was canceled from event that had set before stage case execution.", + "message": "Cancel case: 'bar' before start nested process.", + }, + } + + event = MockEvent(n=2) + rs: Result = stage.execute( + {"params": {"name": "bar"}}, event=event, run_id="04" + ) + assert rs.status == CANCEL + assert rs.context == { + "status": CANCEL, + "case": "bar", + "stages": { + "3616274431": { + "outputs": {}, + "errors": { + "name": "StageCancelError", + "message": "Cancel before start empty process.", + }, + "status": CANCEL, + } + }, + "errors": { + "name": "StageCancelError", + "message": "Cancel case 'bar' after end nested process.", }, } @@ -237,3 +354,30 @@ def test_case_stage_exec_skipped(): rs: Result = stage.execute({"params": {"name": "test"}}, run_id="01") assert rs.status == SKIP assert rs.context == {"status": SKIP} + + stage: Stage = CaseStage.model_validate( + { + "name": "Stage skip not has else condition", + "id": "not-else", + "case": "${{ params.name }}", + "match": [ + { + "case": "bar", + "stages": [ + { + "name": "Match name with Bar", + "if": "'${{ params.name }}' != 'bar'", + "echo": "Hello ${{ params.name }}", + } + ], + } + ], + } + ) + rs: Result = stage.execute({"params": {"name": "bar"}}, run_id="02") + assert rs.status == SKIP + assert rs.context == { + "status": SKIP, + "case": "bar", + "stages": {"3616274431": {"outputs": {}, "status": SKIP}}, + } diff --git a/tests/stages/test_stage_empty.py b/tests/stages/test_stage_empty.py index 0168cce8..c59577f4 100644 --- a/tests/stages/test_stage_empty.py +++ b/tests/stages/test_stage_empty.py @@ -177,7 +177,7 @@ async def test_empty_stage_axec_cancel(): "status": CANCEL, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start empty process.", }, } diff --git a/tests/stages/test_stage_foreach.py b/tests/stages/test_stage_foreach.py index 6f855c87..5d814084 100644 --- a/tests/stages/test_stage_foreach.py +++ b/tests/stages/test_stage_foreach.py @@ -230,7 +230,7 @@ def test_foreach_stage_exec_cancel(): "foreach": {}, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start foreach.", + "message": "Cancel before start foreach process.", }, } @@ -253,14 +253,14 @@ def test_foreach_stage_exec_cancel(): "stages": {}, "errors": { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 1 before start nested process.", }, } }, "errors": { 1: { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 1 before start nested process.", } }, } @@ -286,21 +286,21 @@ def test_foreach_stage_exec_cancel(): "outputs": {}, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start empty process.", }, "status": CANCEL, }, }, "errors": { "name": "StageCancelError", - "message": "Item execution was canceled from the event after end item execution.", + "message": "Cancel item: 1 after end nested process.", }, } }, "errors": { 1: { "name": "StageCancelError", - "message": "Item execution was canceled from the event after end item execution.", + "message": "Cancel item: 1 after end nested process.", }, }, } @@ -405,6 +405,7 @@ def test_foreach_stage_exec_raise_full(): ) rs: Result = stage.execute(params={}) assert rs.status == FAILED + possible = [] try: assert rs.context == { "status": FAILED, @@ -426,7 +427,7 @@ def test_foreach_stage_exec_raise_full(): }, "errors": { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Final Echo', failed.", + "message": "Break item: 2 because nested stage: 'Final Echo', failed.", }, }, 1: { @@ -439,139 +440,86 @@ def test_foreach_stage_exec_raise_full(): }, "errors": { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 1 before start nested process.", }, }, }, "errors": { 2: { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Final Echo', failed.", + "message": "Break item: 2 because nested stage: 'Final Echo', failed.", }, 1: { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 1 before start nested process.", }, }, } + possible.append(True) except AssertionError: - try: - assert rs.context == { - "status": FAILED, - "items": [1, 2], - "foreach": { - 2: { - "status": FAILED, - "item": 2, - "stages": { - "2709471980": {"outputs": {}, "status": SUCCESS}, - "9263488742": { - "outputs": {}, - "errors": { - "name": "StageError", - "message": "Raise for item equal 2", - }, - "status": FAILED, - }, - }, - "errors": { - "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Final Echo', failed.", - }, - }, - 1: { - "status": CANCEL, - "item": 1, - "stages": { - "2709471980": {"outputs": {}, "status": SUCCESS}, - "9263488742": {"outputs": {}, "status": SKIP}, - "2238460182": { - "errors": { - "message": "Execution was canceled from the event before start parallel.", - "name": "StageCancelError", - }, - "outputs": {}, - "status": CANCEL, + possible.append(False) + try: + assert rs.context == { + "status": FAILED, + "items": [1, 2], + "foreach": { + 2: { + "status": FAILED, + "item": 2, + "stages": { + "2709471980": {"outputs": {}, "status": SUCCESS}, + "9263488742": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Raise for item equal 2", }, + "status": FAILED, }, - "errors": { - "name": "StageCancelError", - "message": "Item execution was canceled from the event after end item execution.", - }, - }, - }, - "errors": { - 2: { - "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Final Echo', failed.", - }, - 1: { - "name": "StageCancelError", - "message": "Item execution was canceled from the event after end item execution.", - }, - }, - } - except AssertionError: - assert rs.context == { - "errors": { - 1: { - "message": "Item execution was canceled from the event before start item " - "execution.", - "name": "StageCancelError", }, - 2: { - "message": "Item execution was break because its nested-stage, 'Final Echo', " - "failed.", + "errors": { "name": "StageError", + "message": "Break item: 2 because nested stage: 'Final Echo', failed.", }, }, - "foreach": { - 1: { - "errors": { - "message": "Item execution was canceled from the event before start item " - "execution.", - "name": "StageCancelError", - }, - "item": 1, - "stages": { - "2709471980": { - "outputs": {}, - "status": SUCCESS, - }, - "9263488742": { - "outputs": {}, - "status": SKIP, + 1: { + "status": CANCEL, + "item": 1, + "stages": { + "2709471980": {"outputs": {}, "status": SUCCESS}, + "9263488742": {"outputs": {}, "status": SKIP}, + "2238460182": { + "outputs": {}, + "errors": { + "name": "StageCancelError", + "message": "Cancel before start empty process.", }, + "status": CANCEL, }, - "status": CANCEL, }, - 2: { - "errors": { - "message": "Item execution was break because its nested-stage, 'Final " - "Echo', failed.", - "name": "StageError", - }, - "item": 2, - "stages": { - "2709471980": { - "outputs": {}, - "status": SUCCESS, - }, - "9263488742": { - "errors": { - "message": "Raise for item equal 2", - "name": "StageError", - }, - "outputs": {}, - "status": FAILED, - }, - }, - "status": FAILED, + "errors": { + "name": "StageCancelError", + "message": "Cancel item: 1 after end nested process.", }, }, - "items": [1, 2], - "status": FAILED, - } + }, + "errors": { + 2: { + "name": "StageError", + "message": "Break item: 2 because nested stage: 'Final Echo', failed.", + }, + 1: { + "name": "StageCancelError", + "message": "Cancel item: 1 after end nested process.", + }, + }, + } + possible.append(True) + except AssertionError: + possible.append(False) + if not any(possible): + print(rs.context) + raise AssertionError("checking context does not match any case.") def test_foreach_stage_exec_concurrent(test_path): @@ -745,7 +693,7 @@ def test_foreach_stage_exec_concurrent_with_raise(): "stages": {}, "errors": { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Raise with PyStage', failed.", + "message": "Break item: 2 because nested stage: 'Raise with PyStage', failed.", }, }, 1: {"status": SUCCESS, "item": 1, "stages": {}}, @@ -754,7 +702,7 @@ def test_foreach_stage_exec_concurrent_with_raise(): "errors": { 2: { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Raise with PyStage', failed.", + "message": "Break item: 2 because nested stage: 'Raise with PyStage', failed.", } }, } @@ -769,7 +717,7 @@ def test_foreach_stage_exec_concurrent_with_raise(): "stages": {}, "errors": { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Raise with PyStage', failed.", + "message": "Break item: 2 because nested stage: 'Raise with PyStage', failed.", }, }, 1: {"status": SUCCESS, "item": 1, "stages": {}}, @@ -779,18 +727,18 @@ def test_foreach_stage_exec_concurrent_with_raise(): "stages": {}, "errors": { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 3 before start nested process.", }, }, }, "errors": { 2: { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'Raise with PyStage', failed.", + "message": "Break item: 2 because nested stage: 'Raise with PyStage', failed.", }, 3: { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 3 before start nested process.", }, }, } @@ -841,7 +789,7 @@ def test_foreach_stage_exec_concurrent_raise(): }, "errors": { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'raise-error', failed.", + "message": "Break item: 1 because nested stage: 'raise-error', failed.", }, }, 2: { @@ -856,7 +804,7 @@ def test_foreach_stage_exec_concurrent_raise(): "errors": { 1: { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'raise-error', failed.", + "message": "Break item: 1 because nested stage: 'raise-error', failed.", } }, }, @@ -879,7 +827,7 @@ def test_foreach_stage_exec_concurrent_raise(): }, "errors": { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'raise-error', failed.", + "message": "Break item: 1 because nested stage: 'raise-error', failed.", }, }, 2: { @@ -888,18 +836,18 @@ def test_foreach_stage_exec_concurrent_raise(): "stages": {"raise-error": {"outputs": {}, "status": SKIP}}, "errors": { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 2 before start nested process.", }, }, }, "errors": { 2: { "name": "StageCancelError", - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 2 before start nested process.", }, 1: { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'raise-error', failed.", + "message": "Break item: 1 because nested stage: 'raise-error', failed.", }, }, }, @@ -922,7 +870,7 @@ def test_foreach_stage_exec_concurrent_raise(): }, "errors": { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'raise-error', failed.", + "message": "Break item: 1 because nested stage: 'raise-error', failed.", }, }, 2: { @@ -934,25 +882,25 @@ def test_foreach_stage_exec_concurrent_raise(): "outputs": {}, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start empty process.", }, "status": CANCEL, }, }, "errors": { "name": "StageCancelError", - "message": "Item execution was canceled from the event after end item execution.", + "message": "Cancel item: 2 after end nested process.", }, }, }, "errors": { 2: { "name": "StageCancelError", - "message": "Item execution was canceled from the event after end item execution.", + "message": "Cancel item: 2 after end nested process.", }, 1: { "name": "StageError", - "message": "Item execution was break because its nested-stage, 'raise-error', failed.", + "message": "Break item: 1 because nested stage: 'raise-error', failed.", }, }, }, @@ -971,7 +919,7 @@ def test_foreach_stage_exec_concurrent_raise(): "foreach": { 1: { "errors": { - "message": "Item execution was break because its nested-stage, " + "message": "Break item: 1 because nested stage: " "'raise-error', failed.", "name": "StageError", }, @@ -993,19 +941,19 @@ def test_foreach_stage_exec_concurrent_raise(): { "errors": { 1: { - "message": "Item execution was break because its nested-stage, 'raise-error', " + "message": "Break item: 1 because nested stage: 'raise-error', " "failed.", "name": "StageError", }, 2: { - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 2 before start nested process.", "name": "StageCancelError", }, }, "foreach": { 1: { "errors": { - "message": "Item execution was break because its nested-stage, " + "message": "Break item: 1 because nested stage: " "'raise-error', failed.", "name": "StageError", }, @@ -1024,7 +972,7 @@ def test_foreach_stage_exec_concurrent_raise(): }, 2: { "errors": { - "message": "Item execution was canceled from the event before start item execution.", + "message": "Cancel item: 2 before start nested process.", "name": "StageCancelError", }, "item": 2, diff --git a/tests/stages/test_stage_parallel.py b/tests/stages/test_stage_parallel.py index b1a97d6f..7bd34973 100644 --- a/tests/stages/test_stage_parallel.py +++ b/tests/stages/test_stage_parallel.py @@ -61,6 +61,68 @@ def test_parallel_stage_exec(): } +def test_parallel_stage_exec_max_workers(): + stage: Stage = ParallelStage.model_validate( + { + "id": "parallel-stage", + "name": "Start run parallel stage", + "max-workers": "${{ max-workers }}", + "parallel": { + "branch01": [ + { + "name": "Echo branch01 stage", + "echo": "Start run with branch 1\n", + "sleep": 1.0, + }, + { + "id": "skip-stage", + "name": "Skip Stage", + "if": "${{ branch | rstr }} == 'branch02'", + }, + ], + "branch02": [ + { + "name": "Echo branch02 stage", + "echo": "Start run with branch 2\n", + } + ], + }, + "extras": {"foo": "bar"}, + }, + ) + rs: Result = stage.execute(params={"max-workers": 1}) + assert rs.status == SUCCESS + assert rs.context == { + "status": SUCCESS, + "workers": 1, + "parallel": { + "branch02": { + "status": SUCCESS, + "branch": "branch02", + "stages": {"4967824305": {"outputs": {}, "status": SUCCESS}}, + }, + "branch01": { + "status": SUCCESS, + "branch": "branch01", + "stages": { + "0573477600": {"outputs": {}, "status": SUCCESS}, + "skip-stage": {"outputs": {}, "status": SKIP}, + }, + }, + }, + } + + rs: Result = stage.execute(params={"max-workers": 100}) + assert rs.status == FAILED + assert rs.context == { + "status": FAILED, + "errors": { + "name": "ValueError", + "message": "A max-workers value should between 1 and 19.", + }, + } + + def test_parallel_stage_exec_cancel_from_stage(): stage: Stage = ParallelStage.model_validate( { @@ -100,14 +162,14 @@ def test_parallel_stage_exec_cancel_from_stage(): "stages": {}, "errors": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event before start branch execution.", + "message": "Cancel branch: 'branch02' before start nested process.", }, }, }, "errors": { "branch02": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event before start branch execution.", + "message": "Cancel branch: 'branch02' before start nested process.", } }, } @@ -144,101 +206,111 @@ def test_parallel_stage_exec_cancel(): "parallel": {}, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start parallel process.", }, } event = MockEvent(n=2) rs: Result = stage.execute({}, event=event) assert rs.status == CANCEL + possible = [] try: assert rs.context == { "status": CANCEL, "workers": 2, "parallel": { - "branch02": { + "branch01": { "status": CANCEL, - "branch": "branch02", + "branch": "branch01", "stages": {}, "errors": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event before start branch execution.", + "message": "Cancel branch: 'branch01' before start nested process.", }, }, - "branch01": { + "branch02": { "status": CANCEL, - "branch": "branch01", + "branch": "branch02", "stages": { - "0573477600": { + "4967824305": { "outputs": {}, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start empty process.", }, "status": CANCEL, } }, "errors": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event after end branch execution.", + "message": "Cancel branch: 'branch02' after end nested process.", }, }, }, "errors": { - "branch02": { + "branch01": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event before start branch execution.", + "message": "Cancel branch: 'branch01' before start nested process.", }, - "branch01": { + "branch02": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event after end branch execution.", + "message": "Cancel branch: 'branch02' after end nested process.", }, }, } + possible.append(True) except AssertionError: + possible.append(False) + try: assert rs.context == { "status": CANCEL, "workers": 2, "parallel": { - "branch01": { + "branch02": { "status": CANCEL, - "branch": "branch01", + "branch": "branch02", "stages": {}, "errors": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event before start branch execution.", + "message": "Cancel branch: 'branch02' before start nested process.", }, }, - "branch02": { + "branch01": { "status": CANCEL, - "branch": "branch02", + "branch": "branch01", "stages": { - "4967824305": { + "0573477600": { "outputs": {}, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start empty process.", }, "status": CANCEL, } }, "errors": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event after end branch execution.", + "message": "Cancel branch: 'branch01' after end nested process.", }, }, }, "errors": { - "branch01": { + "branch02": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event before start branch execution.", + "message": "Cancel branch: 'branch02' before start nested process.", }, - "branch02": { + "branch01": { "name": "StageCancelError", - "message": "Branch execution was canceled from the event after end branch execution.", + "message": "Cancel branch: 'branch01' after end nested process.", }, }, } + possible.append(True) + except AssertionError: + possible.append(False) + if not any(possible): + print(rs.context) + raise AssertionError("checking context does not match any case.") def test_parallel_stage_exec_raise(): @@ -274,14 +346,14 @@ def test_parallel_stage_exec_raise(): }, "errors": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Raise Stage', failed.", + "message": "Break branch: 'branch01' because nested stage: 'Raise Stage', failed.", }, } }, "errors": { "branch01": { "name": "StageError", - "message": "Branch execution was break because its nested-stage, 'Raise Stage', failed.", + "message": "Break branch: 'branch01' because nested stage: 'Raise Stage', failed.", } }, } diff --git a/tests/stages/test_stage_trigger.py b/tests/stages/test_stage_trigger.py index a409a9a9..eef63eff 100644 --- a/tests/stages/test_stage_trigger.py +++ b/tests/stages/test_stage_trigger.py @@ -102,13 +102,34 @@ def test_trigger_stage_exec_raise(test_path): rs: Result = stage.execute(params={}, run_id="02") assert rs.status == FAILED assert rs.context == { + "params": {}, + "jobs": { + "second-job": { + "status": SUCCESS, + "stages": {"1772094681": {"outputs": {}, "status": SUCCESS}}, + }, + "first-job": { + "status": FAILED, + "stages": { + "raise-error": { + "outputs": {}, + "errors": { + "name": "ValueError", + "message": "Testing raise error inside PyStage!!!", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'raise-error', failed.", + }, + }, + }, "status": FAILED, "errors": { - "name": "StageNestedError", - "message": ( - "Trigger workflow was failed with:\n" - "Job execution, 'first-job', was failed." - ), + "name": "StageError", + "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", }, } @@ -125,9 +146,11 @@ def test_trigger_stage_exec_cancel(): rs: Result = stage.execute(params={}, event=event) assert rs.status == CANCEL assert rs.context == { + "params": {}, + "jobs": {}, "status": CANCEL, "errors": { - "name": "StageNestedCancelError", + "name": "StageCancelError", "message": "Trigger workflow was cancel.", }, } @@ -143,4 +166,17 @@ def test_trigger_stage_exec_skip(): ) rs: Result = stage.execute(params={}) assert rs.status == SKIP - assert rs.context == {"status": SKIP} + assert rs.context == { + "params": {}, + "jobs": { + "first-job": { + "status": SKIP, + "stages": {"2644213676": {"outputs": {}, "status": SKIP}}, + } + }, + "status": SKIP, + "errors": { + "name": "StageSkipError", + "message": "Trigger workflow was skipped.", + }, + } diff --git a/tests/stages/test_stage_until.py b/tests/stages/test_stage_until.py index 209effda..0b3ce90f 100644 --- a/tests/stages/test_stage_until.py +++ b/tests/stages/test_stage_until.py @@ -85,13 +85,13 @@ def test_until_stage_raise(): }, "errors": { "name": "StageNestedError", - "message": "Loop execution was break because its nested-stage, 'Raise stage nested', failed.", + "message": "Break loop: 0 because nested stage: 'Raise stage nested', failed.", }, } }, "errors": { "name": "StageNestedError", - "message": "Loop execution was break because its nested-stage, 'Raise stage nested', failed.", + "message": "Break loop: 0 because nested stage: 'Raise stage nested', failed.", }, } @@ -185,7 +185,7 @@ def test_until_stage_cancel(): "until": {}, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start loop.", + "message": "Cancel before start loop process, (loop: 1).", }, } @@ -202,13 +202,13 @@ def test_until_stage_cancel(): "stages": {}, "errors": { "name": "StageCancelError", - "message": "Loop execution was canceled from the event before start loop execution.", + "message": "Cancel loop: 0 before start nested process.", }, } }, "errors": { "name": "StageCancelError", - "message": "Loop execution was canceled from the event before start loop execution.", + "message": "Cancel loop: 0 before start nested process.", }, } @@ -227,20 +227,20 @@ def test_until_stage_cancel(): "outputs": {}, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start empty process.", }, "status": CANCEL, } }, "errors": { "name": "StageNestedCancelError", - "message": "Loop execution was canceled from the event after end loop execution.", + "message": "Cancel loop: 0 after end nested process.", }, } }, "errors": { "name": "StageNestedCancelError", - "message": "Loop execution was canceled from the event after end loop execution.", + "message": "Cancel loop: 0 after end nested process.", }, } diff --git a/tests/test_audits.py b/tests/test_audits.py index 8f031a71..a781f2fd 100644 --- a/tests/test_audits.py +++ b/tests/test_audits.py @@ -8,8 +8,8 @@ NORMAL, AuditData, BaseAudit, - FileAudit, - SQLiteAudit, + LocalFileAudit, + LocalSQLiteAudit, get_audit, ) from ddeutil.workflow.conf import Config @@ -17,12 +17,14 @@ def test_get_audit_model(): model = get_audit() - assert isinstance(model, FileAudit) + assert isinstance(model, LocalFileAudit) model = get_audit( - extras={"audit_conf": {"type": "sqlite", "path": "./audit.db"}} + extras={"audit_conf": {"type": "sqlite", "path": Path("./audit.db")}} ) - assert isinstance(model, SQLiteAudit) + assert isinstance(model, LocalSQLiteAudit) + + Path("./audit.db").unlink(missing_ok=True) def test_audit_data(): @@ -41,7 +43,7 @@ def test_audit_data(): def test_base_audit(): log = BaseAudit.model_validate( { - "type": "test", + "type": "base", "extras": { "foo": "bar", "datetime": datetime(2024, 1, 1, 1, 15), @@ -49,14 +51,15 @@ def test_base_audit(): } ) assert log.model_dump() == { - "type": "test", + "type": "base", + "logging_name": "ddeutil.workflow", "extras": {"foo": "bar", "datetime": datetime(2024, 1, 1, 1, 15)}, } @mock.patch.object(Config, "enable_write_audit", False) def test_audit_file(): - log = FileAudit(path="./audits") + log = LocalFileAudit(path=Path("./audits")) audit = AuditData.model_validate( obj={ "name": "wf-scheduling-not-exists", @@ -77,7 +80,7 @@ def test_audit_file(): @mock.patch.object(Config, "enable_write_audit", True) def test_audit_file_do_first(): - log = FileAudit(path="./audits") + log = LocalFileAudit(path="./audits") audit = AuditData.model_validate( { "name": "wf-demo-logging", @@ -95,7 +98,7 @@ def test_audit_file_do_first(): pointer = log.pointer(audit) assert pointer.exists() # - # log = FileAudit.find_audit_with_release( + # log = LocalFileAudit.find_audit_with_release( # name="wf-demo-logging", # release=datetime(2024, 1, 1, 1), # ) @@ -105,8 +108,8 @@ def test_audit_file_do_first(): @mock.patch.object(Config, "enable_write_audit", True) -def test_audit_file_find(root_path): - log = FileAudit(path="./audits") +def test_audit_file_find(root_path: Path): + log = LocalFileAudit(path=Path("./audits")) audit = AuditData.model_validate( { "name": "wf-scheduling", @@ -145,7 +148,7 @@ def test_audit_file_find(root_path): def test_audit_file_find_empty(): wf_log_path = Path("./audits/workflow=wf-no-release-log/") wf_log_path.mkdir(exist_ok=True) - log = FileAudit() + log = LocalFileAudit() assert list(log.find_audits(name="wf-no-release-log")) == [] with pytest.raises(FileNotFoundError): @@ -162,13 +165,13 @@ def test_audit_file_find_empty(): def test_audit_file_find_raise(): - log = FileAudit() + log = LocalFileAudit() with pytest.raises(FileNotFoundError): next(log.find_audits(name="wf-file-not-found")) def test_audit_file_find_with_release(): - log = FileAudit() + log = LocalFileAudit() with pytest.raises(FileNotFoundError): log.find_audit_with_release( name="wf-file-not-found", diff --git a/tests/test_job.py b/tests/test_job.py index 62cdface..f3b56a07 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,5 +1,5 @@ import pytest -from ddeutil.workflow.errors import JobError +from ddeutil.workflow import EmptyStage, JobError from ddeutil.workflow.job import ( Job, OnDocker, @@ -48,6 +48,12 @@ def test_job(): with pytest.raises(ValidationError): Job.model_validate({"runs-on": "docker"}) + job = Job( + stages=[EmptyStage(name="Echo Some", echo="Hello World", id="echo")] + ) + stage = job.stage("echo") + assert stage.extras == {} + def test_job_check_needs(): job = Job(id="final-job", needs=["job-before"]) diff --git a/tests/test_job_exec.py b/tests/test_job_exec.py index 9374a4f0..91b2a1b7 100644 --- a/tests/test_job_exec.py +++ b/tests/test_job_exec.py @@ -569,7 +569,7 @@ def test_job_exec_cancel(): "outputs": {}, "errors": { "name": "StageCancelError", - "message": "Execution was canceled from the event before start parallel.", + "message": "Cancel before start empty process.", }, "status": CANCEL, }, diff --git a/tests/test_result.py b/tests/test_result.py index c363f44f..26b5d3f4 100644 --- a/tests/test_result.py +++ b/tests/test_result.py @@ -96,7 +96,7 @@ def change_context(result: Result) -> Result: # pragma: no cov def test_validate_statuses(): assert validate_statuses([SUCCESS, SUCCESS]) == SUCCESS assert validate_statuses([CANCEL, SUCCESS]) == CANCEL - assert validate_statuses([CANCEL, SUCCESS, FAILED]) == CANCEL + assert validate_statuses([CANCEL, SUCCESS, FAILED]) == FAILED assert validate_statuses([FAILED, SUCCESS]) == FAILED assert validate_statuses([FAILED, WAIT]) == FAILED assert validate_statuses([SUCCESS, WAIT]) == WAIT diff --git a/tests/test_workflow.py b/tests/test_workflow.py index f6d260cf..a7e96191 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -47,6 +47,7 @@ def test_workflow(): set_job_id = job.model_copy() set_job_id.id = "demo-run" + set_job_id.extras.update({"__sys_break_circle_exec": "manual-workflow"}) assert workflow.job("demo-run") == set_job_id # NOTE: Raise ValueError when get a job with ID that does not exist. @@ -100,14 +101,21 @@ def test_workflow_bypass_extras(): assert workflow.jobs["first-job"].extras == {} # NOTE: Bypass extras to job model. - assert workflow.job("first-job").extras == {"registries": ["foo", "bar"]} - assert workflow.job("second-job").extras == {"registries": ["foo", "bar"]} + assert workflow.job("first-job").extras == { + "registries": ["foo", "bar"], + "__sys_break_circle_exec": "manual-workflow", + } + assert workflow.job("second-job").extras == { + "registries": ["foo", "bar"], + "__sys_break_circle_exec": "manual-workflow", + } assert workflow.job("first-job").stages[0].extras == {} # NOTE: Bypass extras to stage model. assert workflow.job("first-job").stage("echo").extras == { - "registries": ["foo", "bar"] + "registries": ["foo", "bar"], + "__sys_break_circle_exec": "manual-workflow", } @@ -229,7 +237,10 @@ def test_workflow_from_conf_override(test_path): name="tmp-wf-override-conf-trigger", extras={"conf_path": conf_path} ) stage = workflow.job(name="trigger-job").stage("trigger-stage") - assert stage.extras == {"conf_path": conf_path} + assert stage.extras == { + "conf_path": conf_path, + "__sys_break_circle_exec": "tmp-wf-override-conf-trigger", + } rs: Result = workflow.execute(params={"name": "bar"}) assert rs.status == SUCCESS diff --git a/tests/test_workflow_exec.py b/tests/test_workflow_exec.py index 7d42b713..d907fbc3 100644 --- a/tests/test_workflow_exec.py +++ b/tests/test_workflow_exec.py @@ -3,7 +3,6 @@ from textwrap import dedent from unittest.mock import patch -import pytest from ddeutil.core import getdot from ddeutil.workflow import ( CANCEL, @@ -1171,7 +1170,6 @@ def test_workflow_exec_raise_job_trigger(test_path): } -@pytest.mark.skip def test_workflow_exec_circle_trigger(test_path): with dump_yaml_context( test_path / "conf/demo/01_99_wf_test_wf_exec_circle.yml", @@ -1198,7 +1196,7 @@ def test_workflow_exec_circle_trigger(test_path): "outputs": {}, "errors": { "name": "StageError", - "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", + "message": "Circle execute via trigger itself workflow name.", }, "status": FAILED, } @@ -1243,7 +1241,7 @@ def test_workflow_exec_circle_trigger(test_path): "outputs": {}, "errors": { "name": "StageError", - "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", + "message": "Circle execute via trigger itself workflow name.", }, "status": FAILED, } @@ -1297,7 +1295,28 @@ def test_workflow_exec_circle_trigger(test_path): "status": FAILED, "stages": { "1099837090": { - "outputs": {}, + "outputs": { + "params": {"name": "wf-circle-runtime-nested"}, + "jobs": { + "first-job": { + "status": FAILED, + "stages": { + "1099837090": { + "outputs": {}, + "errors": { + "name": "StageError", + "message": "Circle execute via trigger itself workflow name.", + }, + "status": FAILED, + } + }, + "errors": { + "name": "JobError", + "message": "Strategy execution was break because its nested-stage, 'Trigger itself', failed.", + }, + } + }, + }, "errors": { "name": "StageError", "message": "Trigger workflow was failed with:\nJob execution, 'first-job', was failed.", diff --git a/tests/utils.py b/tests/utils.py index 9294f5bb..2f515b5e 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -6,6 +6,8 @@ from __future__ import annotations import logging +import os +import shutil from collections.abc import Iterator from contextlib import contextmanager from datetime import datetime @@ -16,6 +18,7 @@ from zoneinfo import ZoneInfo import yaml +from ddeutil.core import str2bool from dotenv import load_dotenv OUTSIDE_PATH: Path = Path(__file__).parent.parent @@ -42,6 +45,7 @@ def dotenv_setting() -> None: # pragma: no cov WORKFLOW_LOG_AUDIT_CONF='{{"type": "file", "path": "./audits"}}' WORKFLOW_LOG_AUDIT_ENABLE_WRITE=true WORKFLOW_LOG_TRACE_HANDLERS='[{{"type": "console"}}]' + WORKFLOW_TEST_CLEAN_UP=true """ ).strip() env_path.write_text(env_str) @@ -149,3 +153,8 @@ def exclude_keys(value: Any, keys: list[str]) -> Any: # pragma: no cov def exclude_created_and_updated(value: Any) -> Any: # pragma: no cov return exclude_keys(value, keys=["created_at", "updated_at"]) + + +def clean_up(path: Union[str, Path]) -> None: # pragma: no cov + if str2bool(os.getenv("WORKFLOW_TEST_CLEAN_UP", "true")): + shutil.rmtree(path)