Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions docs/api/traces.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The traces system provides:

## Core Components

### `TraceManager`
### `Trace`

The main trace manager that coordinates multiple handlers and provides a unified logging interface.

Expand Down Expand Up @@ -64,10 +64,10 @@ Basic console logging implementation that outputs to stdout/stderr.
!!! example "Console Handler"

```python
from ddeutil.workflow.traces import ConsoleHandler, TraceManager
from ddeutil.workflow.traces import ConsoleHandler, Trace

handler = ConsoleHandler(type="console")
trace = TraceManager(
trace = Trace(
run_id="workflow-123",
handlers=[handler]
)
Expand All @@ -81,7 +81,7 @@ File-based trace implementation that persists logs to the local filesystem with
!!! example "File Handler Usage"

```python
from ddeutil.workflow.traces import FileHandler, TraceManager
from ddeutil.workflow.traces import FileHandler, Trace

# Create file handler
handler = FileHandler(
Expand All @@ -90,7 +90,7 @@ File-based trace implementation that persists logs to the local filesystem with
format="{datetime} ({process:5d}, {thread:5d}) ({cut_id}) {message:120s} ({filename}:{lineno})"
)

trace = TraceManager(
trace = Trace(
run_id="workflow-123",
parent_run_id="parent-456",
handlers=[handler]
Expand Down Expand Up @@ -137,15 +137,15 @@ SQLite-based trace implementation for scalable logging with structured metadata
!!! example "SQLite Handler"

```python
from ddeutil.workflow.traces import SQLiteHandler, TraceManager
from ddeutil.workflow.traces import SQLiteHandler, Trace

handler = SQLiteHandler(
type="sqlite",
path="./logs/workflow_traces.db",
table_name="traces"
)

trace = TraceManager(
trace = Trace(
run_id="workflow-789",
handlers=[handler]
)
Expand Down Expand Up @@ -206,7 +206,7 @@ REST API integration for external logging services.
!!! example "REST API Handler"

```python
from ddeutil.workflow.traces import RestAPIHandler, TraceManager
from ddeutil.workflow.traces import RestAPIHandler, Trace

# Datadog integration
handler = RestAPIHandler(
Expand All @@ -218,7 +218,7 @@ REST API integration for external logging services.
max_retries=3
)

trace = TraceManager(
trace = Trace(
run_id="workflow-123",
handlers=[handler]
)
Expand All @@ -238,7 +238,7 @@ High-performance Elasticsearch logging with bulk indexing and search capabilitie
!!! example "Elasticsearch Handler"

```python
from ddeutil.workflow.traces import ElasticHandler, TraceManager
from ddeutil.workflow.traces import ElasticHandler, Trace

handler = ElasticHandler(
type="elastic",
Expand All @@ -250,7 +250,7 @@ High-performance Elasticsearch logging with bulk indexing and search capabilitie
max_retries=3
)

trace = TraceManager(
trace = Trace(
run_id="workflow-123",
handlers=[handler]
)
Expand Down Expand Up @@ -320,7 +320,7 @@ All handlers support asynchronous logging for non-blocking operations:

## Buffer Support

The `TraceManager` supports buffered logging for high-performance scenarios:
The `Trace` supports buffered logging for high-performance scenarios:

!!! example "Buffered Logging"

Expand All @@ -342,7 +342,7 @@ The `TraceManager` supports buffered logging for high-performance scenarios:

### `get_trace`

Factory function that returns a `TraceManager` instance with handlers configured from the core configuration.
Factory function that returns a `Trace` instance with handlers configured from the core configuration.

!!! example "Dynamic Trace Creation"

Expand Down
14 changes: 14 additions & 0 deletions docs/logs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Logs

For logging of this package will split to 2 parts:

1. Trace log
2. Audit log

## Trace Log

This part will show all process logs that return with the trace model.

## Audit Log

This part will use to tracking workflow release log.
1 change: 1 addition & 0 deletions src/ddeutil/workflow/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
__version__: str = "0.0.81"
__python_version__: str = "3.9"
25 changes: 19 additions & 6 deletions src/ddeutil/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,25 @@
from .__cron import CronRunner
from .__types import DictData, DictStr, Matrix, Re, TupleStr
from .audits import (
EVENT,
FORCE,
NORMAL,
RERUN,
Audit,
FileAudit,
get_audit,
)
from .conf import *
from .conf import (
PREFIX,
CallerSecret,
Config,
YamlParser,
api_config,
config,
dynamic,
env,
pass_env,
)
from .errors import (
BaseError,
JobCancelError,
Expand All @@ -63,6 +77,9 @@
ResultError,
StageCancelError,
StageError,
StageNestedCancelError,
StageNestedError,
StageNestedSkipError,
StageSkipError,
UtilError,
WorkflowCancelError,
Expand Down Expand Up @@ -132,15 +149,11 @@
VirtualPyStage,
)
from .traces import (
TraceManager,
Trace,
get_trace,
)
from .utils import *
from .workflow import (
EVENT,
FORCE,
NORMAL,
RERUN,
ReleaseType,
Workflow,
)
4 changes: 2 additions & 2 deletions src/ddeutil/workflow/api/routes/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ...__types import DictData
from ...errors import JobError
from ...job import Job
from ...traces import TraceManager, get_trace
from ...traces import Trace, get_trace
from ...utils import gen_id

logger = logging.getLogger("uvicorn.error")
Expand All @@ -41,7 +41,7 @@ async def job_execute(
if extras:
job.extras = extras

trace: TraceManager = get_trace(
trace: Trace = get_trace(
run_id, parent_run_id=parent_run_id, extras=job.extras
)

Expand Down
58 changes: 45 additions & 13 deletions src/ddeutil/workflow/audits.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,65 @@
from abc import ABC, abstractmethod
from collections.abc import Iterator
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Annotated, Any, ClassVar, Literal, Optional, Union
from urllib.parse import ParseResult, urlparse

from pydantic import BaseModel, Field, TypeAdapter
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
from pydantic.functional_validators import field_validator, model_validator
from typing_extensions import Self

from .__types import DictData
from .conf import dynamic
from .traces import TraceManager, get_trace, set_logging
from .traces import Trace, get_trace, set_logging

logger = logging.getLogger("ddeutil.workflow")


class ReleaseType(str, Enum):
"""Release type enumeration for workflow execution modes.

This enum defines the different types of workflow releases that can be
triggered, each with specific behavior and use cases.

Attributes:
NORMAL: Standard workflow release execution
RERUN: Re-execution of previously failed workflow
EVENT: Event-triggered workflow execution
FORCE: Forced execution bypassing normal conditions
"""

NORMAL = "normal"
RERUN = "rerun"
EVENT = "event"
FORCE = "force"


NORMAL = ReleaseType.NORMAL
RERUN = ReleaseType.RERUN
EVENT = ReleaseType.EVENT
FORCE = ReleaseType.FORCE


class AuditData(BaseModel):
"""Audit Data model."""

model_config = ConfigDict(use_enum_values=True)

name: str = Field(description="A workflow name.")
release: datetime = Field(description="A release datetime.")
type: str = Field(description="A running type before logging.")
type: ReleaseType = Field(
default=NORMAL, description="A running type before logging."
)
context: DictData = Field(
default_factory=dict,
description="A context that receive from a workflow execution result.",
)
run_id: str = Field(description="A running ID")
parent_run_id: Optional[str] = Field(
default=None, description="A parent running ID."
)
run_id: str = Field(description="A running ID")
runs_metadata: DictData = Field(
default_factory=dict,
description="A runs metadata that will use to tracking this audit log.",
Expand Down Expand Up @@ -122,7 +154,7 @@ def __model_action(self) -> Self:
@abstractmethod
def is_pointed(
self,
data: AuditData,
data: Any,
*,
extras: Optional[DictData] = None,
) -> bool:
Expand Down Expand Up @@ -328,21 +360,21 @@ def find_audit_with_release(
return AuditData.model_validate(obj=json.load(f))

def is_pointed(
self, data: AuditData, *, extras: Optional[DictData] = None
self,
data: Any,
*,
extras: Optional[DictData] = None,
) -> bool:
"""Check if the release log already exists at the destination log path.

Args:
data: The workflow name.
data (str):
extras: Optional extra parameters to override core config.

Returns:
bool: True if the release log exists, False otherwise.
"""
# NOTE: Return False if enable writing log flag does not set.
if not dynamic("enable_write_audit", extras=extras):
return False
return self.pointer(data).exists()
return self.pointer(AuditData.model_validate(data)).exists()

def pointer(self, data: AuditData) -> Path:
"""Return release directory path generated from model data.
Expand All @@ -365,7 +397,7 @@ def save(self, data: Any, excluded: Optional[list[str]] = None) -> Self:
Self: The audit instance after saving.
"""
audit = AuditData.model_validate(data)
trace: TraceManager = get_trace(
trace: Trace = get_trace(
audit.run_id,
parent_run_id=audit.parent_run_id,
extras=self.extras,
Expand Down Expand Up @@ -655,7 +687,7 @@ def save(self, data: Any, excluded: Optional[list[str]] = None) -> Self:
ValueError: If SQLite database is not properly configured.
"""
audit = AuditData.model_validate(data)
trace: TraceManager = get_trace(
trace: Trace = get_trace(
audit.run_id,
parent_run_id=audit.parent_run_id,
extras=self.extras,
Expand Down
9 changes: 9 additions & 0 deletions src/ddeutil/workflow/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ class StageCancelError(StageError): ...
class StageSkipError(StageError): ...


class StageNestedError(StageError): ...


class StageNestedCancelError(StageNestedError): ...


class StageNestedSkipError(StageNestedError): ...


class JobError(BaseError): ...


Expand Down
Loading