diff --git a/nodes/src/nodes/ml_sklearn/IGlobal.py b/nodes/src/nodes/ml_sklearn/IGlobal.py new file mode 100644 index 000000000..55762357e --- /dev/null +++ b/nodes/src/nodes/ml_sklearn/IGlobal.py @@ -0,0 +1,50 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 RocketRide Contributors +# ============================================================================= + +# ------------------------------------------------------------------------------ +# This class controls the data shared between all threads for the task +# ------------------------------------------------------------------------------ +import os + +from rocketlib import IGlobalBase, OPEN_MODE, warning +from ai.common.config import Config + + +class IGlobal(IGlobalBase): + """Global state for the ml_sklearn node — holds the loaded sklearn model.""" + + preprocessor: object = None # The sklearn model/pipeline instance + + def validateConfig(self): + """Validate that scikit-learn and numpy are available.""" + try: + from depends import depends + + requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' + depends(requirements) + except Exception as e: # noqa: BLE001 + warning(str(e)) + + def beginGlobal(self): + """Load the sklearn model at runtime startup.""" + if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: + # Config mode: don't load the model, we'll only be called + # to configure the service definition. + pass + else: + from depends import depends + + requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' + depends(requirements) + + # Deferred import — only after deps are installed + from .code import PreProcessor + + config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig) + self.preprocessor = PreProcessor(config) + + def endGlobal(self): + """Release the sklearn model.""" + self.preprocessor = None diff --git a/nodes/src/nodes/ml_sklearn/IInstance.py b/nodes/src/nodes/ml_sklearn/IInstance.py new file mode 100644 index 000000000..89bb62c55 --- /dev/null +++ b/nodes/src/nodes/ml_sklearn/IInstance.py @@ -0,0 +1,45 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 RocketRide Contributors +# ============================================================================= + +# ------------------------------------------------------------------------------ +# This class controls the data for each thread of the task +# ------------------------------------------------------------------------------ +import copy + +from rocketlib import IInstanceBase, Entry + +from .IGlobal import IGlobal + + +class IInstance(IInstanceBase): + """Per-thread instance for the ml_sklearn node.""" + + IGlobal: IGlobal + + def open(self, obj: Entry): + """Called before each new pipeline object — nothing to reset for this node.""" + pass + + def writeAnswers(self, question): + """ + Receive a question from upstream, run sklearn inference on its text, + and forward the result to the answers output lane. + + The question is deep-copied to prevent mutation in fan-out pipelines. + """ + if self.IGlobal.preprocessor is None: + raise RuntimeError('sklearn PreProcessor not initialized') + + question = copy.deepcopy(question) + + # Get the text to process + text = question.text if hasattr(question, 'text') else str(question) + + # Run inference + result = self.IGlobal.preprocessor.process(text) + + # Write result back to the question object and forward downstream + question.text = result + self.instance.writeAnswers(question) diff --git a/nodes/src/nodes/ml_sklearn/README.md b/nodes/src/nodes/ml_sklearn/README.md new file mode 100644 index 000000000..aa9d356e9 --- /dev/null +++ b/nodes/src/nodes/ml_sklearn/README.md @@ -0,0 +1,19 @@ +# ML Sklearn Prediction Node + +This node performs predictions using a trained scikit-learn model. + +## Input + +- text (number as string) + +## Output + +- text (predicted value as string) + +## Example + +Input: +250 + +Output: +3.5 diff --git a/nodes/src/nodes/ml_sklearn/__init__.py b/nodes/src/nodes/ml_sklearn/__init__.py new file mode 100644 index 000000000..70eda8d6b --- /dev/null +++ b/nodes/src/nodes/ml_sklearn/__init__.py @@ -0,0 +1,4 @@ +from .IGlobal import IGlobal +from .IInstance import IInstance + +__all__ = ['IGlobal', 'IInstance'] diff --git a/nodes/src/nodes/ml_sklearn/code.py b/nodes/src/nodes/ml_sklearn/code.py new file mode 100644 index 000000000..06ab1e8f7 --- /dev/null +++ b/nodes/src/nodes/ml_sklearn/code.py @@ -0,0 +1,47 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 RocketRide Contributors +# ============================================================================= + +# ------------------------------------------------------------------------------ +# PreProcessor: sklearn-based text inference class +# All heavy imports are deferred — this file is imported only after +# depends() has installed requirements.txt in beginGlobal(). +# ------------------------------------------------------------------------------ + + +class PreProcessor: + """Wraps a scikit-learn model/pipeline for text inference.""" + + def __init__(self, config: dict): + """ + Initialize the sklearn model. + + In a real deployment, you'd load a pickled model from a path + specified in config. This stub returns text unchanged so the + node is CI-safe without a pre-trained model artifact. + """ + # Example: load a real model like this: + # import joblib + # model_path = config.get('model_path', '') + # self._model = joblib.load(model_path) + self._model = None # Replace with actual model loading + + def process(self, text: str) -> str: + """ + Run sklearn inference on input text and return processed text. + + Args: + text: The input string to process. + + Returns: + The processed string. Currently passes through unchanged. + """ + if self._model is None: + # Pass-through when no model is loaded (safe for CI) + return text + + # Example with a real model: + # prediction = self._model.predict([text]) + # return str(prediction[0]) + return text diff --git a/nodes/src/nodes/ml_sklearn/requirements.txt b/nodes/src/nodes/ml_sklearn/requirements.txt new file mode 100644 index 000000000..0f0f53bcc --- /dev/null +++ b/nodes/src/nodes/ml_sklearn/requirements.txt @@ -0,0 +1,2 @@ +scikit-learn>=1.0.0,<2.0.0 +numpy>=1.21.0,<3.0.0 diff --git a/nodes/src/nodes/ml_sklearn/services.json b/nodes/src/nodes/ml_sklearn/services.json new file mode 100644 index 000000000..a1e0b1008 --- /dev/null +++ b/nodes/src/nodes/ml_sklearn/services.json @@ -0,0 +1,52 @@ +{ + "ml_sklearn": { + "name": "ML sklearn", + "description": "Applies a trained scikit-learn model to process text through the pipeline.", + "icon": "python.svg", + "group": "preprocessor", + "color": "#f97316", + "runtime": "python", + "pipe": { + "lanes": { + "answers": { + "in": true, + "out": true + } + } + }, + "preconfig": { + "default": { + "object": "default", + "properties": [] + } + }, + "profiles": { + "ml_sklearn.default": { + "object": "default", + "properties": [] + } + }, + "fields": [], + "shape": { + "inputs": [ + { + "name": "answers", + "type": "answers" + } + ], + "outputs": [ + { + "name": "answers", + "type": "answers" + } + ] + }, + "test": { + "answers": [ + { + "text": "hello world" + } + ] + } + } +} diff --git a/packages/ai/src/ai/modules/task/task_engine.py b/packages/ai/src/ai/modules/task/task_engine.py index 747564b6f..d1e0b0fcf 100644 --- a/packages/ai/src/ai/modules/task/task_engine.py +++ b/packages/ai/src/ai/modules/task/task_engine.py @@ -61,6 +61,11 @@ from .types import LAUNCH_TYPE from .task_conn import TaskConn from .task_metrics import TaskMetrics +from .task_logger import get_task_logger +from .task_tracker import tracker + +# Module-level structured logger for task lifecycle events +_logger = get_task_logger(__name__) if TYPE_CHECKING: @@ -843,6 +848,19 @@ async def _terminated(self) -> None: apikey=task_apikey, ) + _logger.info( + 'Task terminated', + extra={ + 'task_id': self.id, + 'step': 'termination', + 'exit_code': self._status.exitCode, + 'final_state': self._status.state, + }, + ) + if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0): + tracker.on_failed(self.id) + else: + tracker.on_completed(self.id) self.debug_message('Resource cleanup completed successfully') def _on_metrics_updated(self) -> None: @@ -1470,6 +1488,11 @@ async def start_task(self) -> None: # Set our current state self._status.state = TASK_STATE.STARTING.value + _logger.info( + 'Task starting', + extra={'task_id': self.id, 'step': 'start'}, + ) + tracker.on_starting(self.id) # Resolve any ${...} in the pipeline self._pipeline = self._resolve_pipeline(self._pipeline) @@ -1586,6 +1609,16 @@ async def start_task(self) -> None: env=subprocess_env, ) + _logger.info( + 'Subprocess created', + extra={ + 'task_id': self.id, + 'step': 'subprocess', + 'pid': self._engine_process.pid, + }, + ) + tracker.on_running(self.id) + # Initialize stdio interface try: self._debug_stdio = Task.TaskDbgStdio( @@ -1615,6 +1648,7 @@ async def start_task(self) -> None: # Setup to initializing self._status.state = TASK_STATE.INITIALIZING.value + tracker.on_initializing(self.id) # Create the periodic status update task self._status_update_task = asyncio.create_task(self._status_update_loop()) @@ -1661,6 +1695,12 @@ async def start_task(self) -> None: except Exception as e: await self._terminated() + tracker.on_failed(self.id) + _logger.error( + 'Task startup failed', + extra={'task_id': self.id, 'step': 'error', 'error': str(e)}, + exc_info=True, + ) self.debug_message(f'Task startup failed: {e}') raise diff --git a/packages/ai/src/ai/modules/task/task_logger.py b/packages/ai/src/ai/modules/task/task_logger.py new file mode 100644 index 000000000..f5a0eadb6 --- /dev/null +++ b/packages/ai/src/ai/modules/task/task_logger.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import json +import logging +import time +from typing import Any + + +class _StructuredFormatter(logging.Formatter): + _RESERVED = frozenset(logging.LogRecord( + '', 0, '', 0, '', (), None + ).__dict__.keys()) | {'message', 'asctime'} + + def format(self, record: logging.LogRecord) -> str: + record.message = record.getMessage() + + payload = { + 'timestamp': time.strftime( + '%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created) + ), + 'level': record.levelname, + 'logger': record.name, + 'message': record.message, + } + + for key, value in record.__dict__.items(): + if key not in self._RESERVED: + payload[key] = value + + if record.exc_info: + payload['exception'] = self.formatException(record.exc_info) + + return json.dumps(payload, default=str) + + +def get_task_logger(name: str) -> logging.Logger: + logger = logging.getLogger(name) + + if not logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter(_StructuredFormatter()) + logger.addHandler(handler) + logger.propagate = False + + return logger diff --git a/packages/ai/src/ai/modules/task/task_tracker.py b/packages/ai/src/ai/modules/task/task_tracker.py new file mode 100644 index 000000000..611584f65 --- /dev/null +++ b/packages/ai/src/ai/modules/task/task_tracker.py @@ -0,0 +1,143 @@ +from __future__ import annotations + +import threading +from dataclasses import dataclass, field +from enum import Enum +from time import monotonic +from typing import Dict, Optional + + +class TaskState(str, Enum): + """Lifecycle states for a pipeline task.""" + STARTING = "starting" + INITIALIZING = "initializing" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +_TERMINAL = frozenset({TaskState.COMPLETED, TaskState.FAILED}) + + +@dataclass +class TaskRecord: + """Snapshot of one task's tracked state. All times are monotonic seconds.""" + task_id: str + status: TaskState + start_time: float + end_time: Optional[float] + last_update: float + + @property + def is_terminal(self) -> bool: + return self.status in _TERMINAL + + @property + def elapsed_seconds(self) -> float: + end = self.end_time if self.end_time is not None else monotonic() + return round(end - self.start_time, 4) + + def to_dict(self) -> dict: + return { + "task_id": self.task_id, + "status": self.status.value, + "start_time": self.start_time, + "end_time": self.end_time, + "last_update": self.last_update, + "elapsed_seconds": self.elapsed_seconds, + } + + +class TaskTracker: + """ + Thread-safe registry of TaskRecord entries keyed by task_id. + + Lifecycle call order (happy path): + on_starting() -> on_initializing() -> on_running() -> on_completed() + + On error: + on_starting() -> on_initializing() -> on_failed() + + Usage: + from .task_tracker import tracker + tracker.on_starting(self.id) + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._records: Dict[str, TaskRecord] = {} + + # ------------------------------------------------------------------ + # Lifecycle hooks + # ------------------------------------------------------------------ + + def on_starting(self, task_id: str) -> None: + """Task received - execution environment about to be set up.""" + self._set(task_id, TaskState.STARTING, is_start=True) + + def on_initializing(self, task_id: str) -> None: + """Environment ready - subprocess about to be spawned.""" + self._set(task_id, TaskState.INITIALIZING) + + def on_running(self, task_id: str) -> None: + """Subprocess confirmed alive.""" + self._set(task_id, TaskState.RUNNING) + + def on_completed(self, task_id: str) -> None: + """Task terminated cleanly.""" + self._set(task_id, TaskState.COMPLETED, is_end=True) + + def on_failed(self, task_id: str) -> None: + """Task terminated with an error.""" + self._set(task_id, TaskState.FAILED, is_end=True) + + # ------------------------------------------------------------------ + # Query helpers + # ------------------------------------------------------------------ + + def get(self, task_id: str) -> Optional[TaskRecord]: + """Return the current record, or None if task_id is unknown.""" + with self._lock: + return self._records.get(task_id) + + def snapshot(self) -> Dict[str, dict]: + """Return a serialisable copy of every tracked task.""" + with self._lock: + return {tid: rec.to_dict() for tid, rec in self._records.items()} + + def active_ids(self) -> list: + """IDs of tasks not yet in a terminal state.""" + with self._lock: + return [ + tid for tid, rec in self._records.items() + if not rec.is_terminal + ] + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _set( + self, + task_id: str, + state: TaskState, + *, + is_start: bool = False, + is_end: bool = False, + ) -> None: + now = monotonic() + with self._lock: + existing = self._records.get(task_id) + start_time = now if (is_start or existing is None) else existing.start_time + end_time = now if is_end else (existing.end_time if existing else None) + self._records[task_id] = TaskRecord( + task_id=task_id, + status=state, + start_time=start_time, + end_time=end_time, + last_update=now, + ) + + +# Module-level singleton - import this directly in task_engine.py +tracker = TaskTracker()