Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: "data-pipeline-etl"
name: "etl-s3"
type: "Workflow"
desc: |
End-to-end ETL pipeline for customer data processing with data quality checks
Expand Down Expand Up @@ -31,14 +31,12 @@ jobs:
data-extraction:
id: "data-extraction"
desc: "Extract raw data from multiple sources"

runs-on:
type: "aws_batch"
with:
job_queue_arn: "${AWS_BATCH_JOB_QUEUE_ARN}"
s3_bucket: "${S3_BUCKET}"
region_name: "us-east-1"

stages:
- name: "Extract customer data"
bash: |
Expand All @@ -56,11 +54,14 @@ jobs:
transaction_files = glob.glob("/tmp/transactions/*.csv")

if not customer_files:
raise Exception("No customer data files found")
raise FileNotFoundError("No customer data files found")
if not transaction_files:
raise Exception("No transaction data files found")
raise FileNotFoundError("No transaction data files found")

print(f"Found {len(customer_files)} customer files and {len(transaction_files)} transaction files")
print(
f"Found {len(customer_files)} customer files and "
f"{len(transaction_files)} transaction files"
)

- name: "Generate extraction report"
echo: "Data extraction completed successfully for ${{ params.processing_date }}"
Expand Down
19 changes: 11 additions & 8 deletions docs/examples/conf/02-ml-model-training.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
name: "ml-model-training"
type: "Workflow"
description: "Machine learning model training pipeline with hyperparameter optimization and model evaluation"

description: |
Machine learning model training pipeline with hyperparameter optimization
and model evaluation
on:
schedule:
- cronjob: "0 0 * * 0" # Weekly on Sunday at midnight
timezone: "UTC"
params:
dataset_path:
type: str
Expand All @@ -10,7 +15,10 @@ params:

model_type:
type: choice
options: ["random_forest", "xgboost", "neural_network"]
options:
- "random_forest"
- "xgboost"
- "neural_network"
desc: "Type of model to train"

experiment_name:
Expand Down Expand Up @@ -370,8 +378,3 @@ jobs:
channel: "#ml-deployments"
message: "Model ${{ params.experiment_name }} v1 deployed successfully"
color: "good"

on:
schedule:
- cronjob: "0 0 * * 0" # Weekly on Sunday at midnight
timezone: "UTC"
4 changes: 2 additions & 2 deletions src/ddeutil/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@
from .__cron import CronRunner
from .__types import DictData, DictStr, Matrix, Re, TupleStr
from .audits import (
EVENT,
DRYRUN,
FORCE,
NORMAL,
RERUN,
Audit,
FileAudit,
LocalFileAudit,
get_audit,
)
from .conf import (
Expand Down
11 changes: 10 additions & 1 deletion src/ddeutil/workflow/__types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@
Match,
Pattern,
)
from typing import Any, Optional, TypedDict, Union
from typing import Any, Optional, TypedDict, Union, cast

from typing_extensions import Self

StrOrNone = Optional[str]
StrOrInt = Union[str, int]
TupleStr = tuple[str, ...]
ListStr = list[str]
ListInt = list[int]
DictData = dict[str, Any]
DictRange = dict[int, Any]
DictStr = dict[str, str]
Matrix = dict[str, Union[list[str], list[int]]]


def cast_dict(value: TypedDict[...]) -> DictData:
"""Cast any TypedDict object to DictData type."""
return cast(DictData, value)


# Pre-compile regex patterns for better performance
_RE_CALLER_PATTERN = r"""
\$ # start with $
Expand Down
105 changes: 64 additions & 41 deletions src/ddeutil/workflow/audits.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,38 @@ class ReleaseType(str, Enum):
Attributes:
NORMAL: Standard workflow release execution
RERUN: Re-execution of previously failed workflow
EVENT: Event-triggered workflow execution
DRYRUN: Dry-execution workflow
FORCE: Forced execution bypassing normal conditions
"""

NORMAL = "normal"
RERUN = "rerun"
EVENT = "event"
FORCE = "force"
DRYRUN = "dryrun"


NORMAL = ReleaseType.NORMAL
RERUN = ReleaseType.RERUN
EVENT = ReleaseType.EVENT
DRYRUN = ReleaseType.DRYRUN
FORCE = ReleaseType.FORCE


class AuditData(BaseModel):
"""Audit Data model."""
"""Audit Data model that use to be the core data for any Audit model manage
logging at the target pointer system or service like file-system, sqlite
database, etc.
"""

model_config = ConfigDict(use_enum_values=True)

name: str = Field(description="A workflow name.")
release: datetime = Field(description="A release datetime.")
type: ReleaseType = Field(
default=NORMAL, description="A running type before logging."
default=NORMAL,
description=(
"An execution type that should be value in ('normal', 'rerun', "
"'force', 'dryrun')."
),
)
context: DictData = Field(
default_factory=dict,
Expand All @@ -121,18 +128,17 @@ class BaseAudit(BaseModel, ABC):
for logging subclasses like file, sqlite, etc.
"""

type: str
type: Literal["base"] = "base"
logging_name: str = "ddeutil.workflow"
extras: DictData = Field(
default_factory=dict,
description="An extras parameter that want to override core config",
)

@field_validator("extras", mode="before")
def validate_extras(cls, v: Any) -> DictData:
def __prepare_extras(cls, v: Any) -> Any:
"""Validate extras field to ensure it's a dictionary."""
if v is None:
return {}
return v
return {} if v is None else v

@model_validator(mode="after")
def __model_action(self) -> Self:
Expand All @@ -148,7 +154,7 @@ def __model_action(self) -> Self:
self.do_before()

# NOTE: Start setting log config in this line with cache.
set_logging("ddeutil.workflow")
set_logging(self.logging_name)
return self

@abstractmethod
Expand Down Expand Up @@ -248,27 +254,33 @@ def save(
raise NotImplementedError("Audit should implement `save` method.")


class FileAudit(BaseAudit):
class LocalFileAudit(BaseAudit):
"""File Audit Pydantic Model for saving log data from workflow execution.

This class inherits from BaseAudit and implements file-based storage
for audit logs. It saves workflow execution results to JSON files
in a structured directory hierarchy.

Attributes:
filename_fmt: Class variable defining the filename format for audit files.
file_fmt: Class variable defining the filename format for audit log.
file_release_fmt: Class variable defining the filename format for audit
release log.
"""

filename_fmt: ClassVar[str] = (
"workflow={name}/release={release:%Y%m%d%H%M%S}"
)
file_fmt: ClassVar[str] = "workflow={name}"
file_release_fmt: ClassVar[str] = "release={release:%Y%m%d%H%M%S}"

type: Literal["file"] = "file"
path: str = Field(
default="./audits",
path: Path = Field(
default=Path("./audits"),
description="A file path that use to manage audit logs.",
)

@field_validator("path", mode="before", json_schema_input_type=str)
def __prepare_path(cls, data: Any) -> Any:
"""Prepare path that passing with string to Path instance."""
return Path(data) if isinstance(data, str) else data

def do_before(self) -> None:
"""Create directory of release before saving log file.

Expand All @@ -278,7 +290,10 @@ def do_before(self) -> None:
Path(self.path).mkdir(parents=True, exist_ok=True)

def find_audits(
self, name: str, *, extras: Optional[DictData] = None
self,
name: str,
*,
extras: Optional[DictData] = None,
) -> Iterator[AuditData]:
"""Generate audit data found from logs path for a specific workflow name.

Expand All @@ -292,7 +307,7 @@ def find_audits(
Raises:
FileNotFoundError: If the workflow directory does not exist.
"""
pointer: Path = Path(self.path) / f"workflow={name}"
pointer: Path = self.path / self.file_fmt.format(name=name)
if not pointer.exists():
raise FileNotFoundError(f"Pointer: {pointer.absolute()}.")

Expand Down Expand Up @@ -325,7 +340,7 @@ def find_audit_with_release(
ValueError: If no releases found when release is None.
"""
if release is None:
pointer: Path = Path(self.path) / f"workflow={name}"
pointer: Path = self.path / self.file_fmt.format(name=name)
if not pointer.exists():
raise FileNotFoundError(f"Pointer: {pointer.absolute()}.")

Expand Down Expand Up @@ -382,8 +397,10 @@ def pointer(self, data: AuditData) -> Path:
Returns:
Path: The directory path for the current workflow and release.
"""
return Path(self.path) / self.filename_fmt.format(
name=data.name, release=data.release
return (
self.path
/ self.file_fmt.format(**data.model_dump(by_alias=True))
/ self.file_release_fmt.format(**data.model_dump(by_alias=True))
)

def save(self, data: Any, excluded: Optional[list[str]] = None) -> Self:
Expand Down Expand Up @@ -459,19 +476,19 @@ def cleanup(self, max_age_days: int = 180) -> int: # pragma: no cov
return cleaned_count


class SQLiteAudit(BaseAudit): # pragma: no cov
class LocalSQLiteAudit(BaseAudit): # pragma: no cov
"""SQLite Audit model for database-based audit storage.

This class inherits from BaseAudit and implements SQLite database storage
for audit logs with compression support.

Attributes:
table_name: Class variable defining the database table name.
schemas: Class variable defining the database schema.
ddl: Class variable defining the database schema.
"""

table_name: ClassVar[str] = "audits"
schemas: ClassVar[
ddl: ClassVar[
str
] = """
CREATE TABLE IF NOT EXISTS audits (
Expand All @@ -489,22 +506,21 @@ class SQLiteAudit(BaseAudit): # pragma: no cov
"""

type: Literal["sqlite"] = "sqlite"
path: str
path: Path = Field(
default=Path("./audits.db"),
description="A SQLite filepath.",
)

def _ensure_table_exists(self) -> None:
def do_before(self) -> None:
"""Ensure the audit table exists in the database."""
audit_url = dynamic("audit_url", extras=self.extras)
if audit_url is None or not audit_url.path:
if self.path.is_dir():
raise ValueError(
"SQLite audit_url must specify a database file path"
"SQLite path must specify a database file path not dir."
)

audit_url_parse: ParseResult = urlparse(audit_url)
db_path = Path(audit_url_parse.path)
db_path.parent.mkdir(parents=True, exist_ok=True)

with sqlite3.connect(db_path) as conn:
conn.execute(self.schemas)
self.path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(self.path) as conn:
conn.execute(self.ddl)
conn.commit()

def is_pointed(
Expand Down Expand Up @@ -771,24 +787,31 @@ def cleanup(self, max_age_days: int = 180) -> int:
return cursor.rowcount


class PostgresAudit(BaseAudit, ABC): ... # pragma: no cov


Audit = Annotated[
Union[
FileAudit,
SQLiteAudit,
LocalFileAudit,
LocalSQLiteAudit,
],
Field(discriminator="type"),
]


def get_audit(extras: Optional[DictData] = None) -> Audit: # pragma: no cov
def get_audit(
audit_conf: Optional[DictData] = None,
extras: Optional[DictData] = None,
) -> Audit: # pragma: no cov
"""Get an audit model dynamically based on the config audit path value.

Args:
audit_conf (DictData):
extras: Optional extra parameters to override the core config.

Returns:
Audit: The appropriate audit model class based on configuration.
"""
audit_conf = dynamic("audit_conf", extras=extras)
audit_conf = dynamic("audit_conf", f=audit_conf, extras=extras)
model = TypeAdapter(Audit).validate_python(audit_conf | {"extras": extras})
return model
Loading