-
Notifications
You must be signed in to change notification settings - Fork 509
feat(ai): add structured logging to task engine lifecycle #745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
f4f7aa5
b22e844
319fca1
7ce10e3
90fac4d
b0598a2
ca2b870
5862bd8
b33eff4
be13299
4d88f21
48a784f
7e6963d
5952843
c667348
12c00e2
c587ada
bb482f0
8e676e5
50a0bd4
96cf781
92593be
fe1f418
89504f9
dc7f679
4c9d406
c39113d
47ad4f9
3e8e46b
5399865
43c725d
5602c71
9eaba7e
efa13f6
66e54c3
cc8840e
1f33c2f
4d29752
9ad5458
282a102
718b5ce
46d6428
83abe32
3d91916
4fe5fe9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 RocketRide Contributors | ||
| # ============================================================================= | ||
|
|
||
| # ------------------------------------------------------------------------------ | ||
| # This class controls the data shared between all threads for the task | ||
| # ------------------------------------------------------------------------------ | ||
| import os | ||
|
|
||
| from rocketlib import IGlobalBase, OPEN_MODE, warning | ||
| from ai.common.config import Config | ||
|
|
||
|
|
||
| class IGlobal(IGlobalBase): | ||
| """Global state for the ml_sklearn node — holds the loaded sklearn model.""" | ||
|
|
||
| preprocessor: object = None # The sklearn model/pipeline instance | ||
|
|
||
| def validateConfig(self): | ||
| """Validate that scikit-learn and numpy are available.""" | ||
| try: | ||
| from depends import depends | ||
|
|
||
| requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' | ||
| depends(requirements) | ||
| except Exception as e: # noqa: BLE001 | ||
| warning(str(e)) | ||
|
|
||
| def beginGlobal(self): | ||
| """Load the sklearn model at runtime startup.""" | ||
| if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: | ||
| # Config mode: don't load the model, we'll only be called | ||
| # to configure the service definition. | ||
| pass | ||
| else: | ||
| from depends import depends | ||
|
|
||
| requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' | ||
| depends(requirements) | ||
|
|
||
| # Deferred import — only after deps are installed | ||
| from .code import PreProcessor | ||
|
|
||
| config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig) | ||
| self.preprocessor = PreProcessor(config) | ||
|
|
||
| def endGlobal(self): | ||
| """Release the sklearn model.""" | ||
| self.preprocessor = None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 RocketRide Contributors | ||
| # ============================================================================= | ||
|
|
||
| # ------------------------------------------------------------------------------ | ||
| # This class controls the data for each thread of the task | ||
| # ------------------------------------------------------------------------------ | ||
| import copy | ||
|
|
||
| from rocketlib import IInstanceBase, Entry | ||
|
|
||
| from .IGlobal import IGlobal | ||
|
|
||
|
|
||
| class IInstance(IInstanceBase): | ||
| """Per-thread instance for the ml_sklearn node.""" | ||
|
|
||
| IGlobal: IGlobal | ||
|
|
||
| def open(self, obj: Entry): | ||
| """Called before each new pipeline object — nothing to reset for this node.""" | ||
| pass | ||
|
|
||
| def writeAnswers(self, question): | ||
| """ | ||
| Receive a question from upstream, run sklearn inference on its text, | ||
| and forward the result to the answers output lane. | ||
|
|
||
| The question is deep-copied to prevent mutation in fan-out pipelines. | ||
| """ | ||
| if self.IGlobal.preprocessor is None: | ||
| raise RuntimeError('sklearn PreProcessor not initialized') | ||
|
|
||
| question = copy.deepcopy(question) | ||
|
|
||
| # Get the text to process | ||
| text = question.text if hasattr(question, 'text') else str(question) | ||
|
|
||
| # Run inference | ||
| result = self.IGlobal.preprocessor.process(text) | ||
|
|
||
| # Write result back to the question object and forward downstream | ||
| question.text = result | ||
| self.instance.writeAnswers(question) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| # ML Sklearn Prediction Node | ||
|
|
||
| This node performs predictions using a trained scikit-learn model. | ||
|
|
||
| ## Input | ||
|
|
||
| - text (number as string) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test fixture in The README specifies the input as Also applies to: 44-49 🤖 Prompt for AI Agents |
||
|
|
||
| ## Output | ||
|
|
||
| - text (predicted value as string) | ||
|
|
||
| ## Example | ||
|
|
||
| Input: | ||
| 250 | ||
|
|
||
| Output: | ||
| 3.5 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| from .IGlobal import IGlobal | ||
| from .IInstance import IInstance | ||
|
|
||
| __all__ = ['IGlobal', 'IInstance'] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 RocketRide Contributors | ||
| # ============================================================================= | ||
|
|
||
| # ------------------------------------------------------------------------------ | ||
| # PreProcessor: sklearn-based text inference class | ||
| # All heavy imports are deferred — this file is imported only after | ||
| # depends() has installed requirements.txt in beginGlobal(). | ||
| # ------------------------------------------------------------------------------ | ||
|
|
||
|
|
||
| class PreProcessor: | ||
| """Wraps a scikit-learn model/pipeline for text inference.""" | ||
|
|
||
| def __init__(self, config: dict): | ||
| """ | ||
| Initialize the sklearn model. | ||
|
|
||
| In a real deployment, you'd load a pickled model from a path | ||
| specified in config. This stub returns text unchanged so the | ||
| node is CI-safe without a pre-trained model artifact. | ||
| """ | ||
| # Example: load a real model like this: | ||
| # import joblib | ||
| # model_path = config.get('model_path', '') | ||
| # self._model = joblib.load(model_path) | ||
| self._model = None # Replace with actual model loading | ||
|
Comment on lines
+16
to
+28
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Two related problems:
🛠️ Proposed fix- def __init__(self, config: dict):
+ def __init__(self, config: dict) -> None:
"""
Initialize the sklearn model.
...
"""
+ self._config = config # Retained for real model loading
# Example: load a real model like this:
# import joblib
- # model_path = config.get('model_path', '')
+ # model_path = self._config.get('model_path', '')
# self._model = joblib.load(model_path)
self._model = None # Replace with actual model loading🧰 Tools🪛 Ruff (0.15.12)[warning] 16-16: Missing return type annotation for special method Add return type annotation: (ANN204) [warning] 16-16: Unused method argument: (ARG002) 🤖 Prompt for AI Agents |
||
|
|
||
| def process(self, text: str) -> str: | ||
| """ | ||
| Run sklearn inference on input text and return processed text. | ||
|
|
||
| Args: | ||
| text: The input string to process. | ||
|
|
||
| Returns: | ||
| The processed string. Currently passes through unchanged. | ||
| """ | ||
| if self._model is None: | ||
| # Pass-through when no model is loaded (safe for CI) | ||
| return text | ||
|
|
||
| # Example with a real model: | ||
| # prediction = self._model.predict([text]) | ||
| # return str(prediction[0]) | ||
| return text | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| scikit-learn>=1.0.0,<2.0.0 | ||
| numpy>=1.21.0,<3.0.0 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| { | ||
| "ml_sklearn": { | ||
| "name": "ML sklearn", | ||
| "description": "Applies a trained scikit-learn model to process text through the pipeline.", | ||
| "icon": "python.svg", | ||
| "group": "preprocessor", | ||
| "color": "#f97316", | ||
| "runtime": "python", | ||
| "pipe": { | ||
| "lanes": { | ||
| "answers": { | ||
| "in": true, | ||
| "out": true | ||
| } | ||
| } | ||
| }, | ||
|
Comment on lines
+9
to
+16
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Description: Check what lane schema is used by other existing nodes to confirm
# whether ml_sklearn's pipe.lanes schema matches or deviates from the pattern.
rg -l '"pipe"' nodes/src/nodes --include='services.json' -n
echo "--- agent_langchain services.json ---"
fd 'services.json' nodes/src/nodes/agent_langchain --exec cat {}
echo "--- search_exa services.json ---"
fd 'services.json' nodes/src/nodes/search_exa --exec cat {}
echo "--- ml_sklearn services.json ---"
fd 'services.json' nodes/src/nodes/ml_sklearn --exec cat {}Repository: rocketride-org/rocketride-server Length of output: 7550 🏁 Script executed: # Check if any other nodes use the "pipe" pattern
echo "=== Checking for 'pipe' usage in all services.json files ==="
find nodes/src/nodes -name 'services.json' -type f | while read file; do
if grep -q '"pipe"' "$file"; then
echo "Found 'pipe' in: $file"
fi
done
echo ""
echo "=== Total services.json files in codebase ==="
find nodes/src/nodes -name 'services.json' -type f | wc -l
echo ""
echo "=== List all services.json file locations ==="
find nodes/src/nodes -name 'services.json' -type fRepository: rocketride-org/rocketride-server Length of output: 3314 🏁 Script executed: # Search for service registry or parser code that validates/processes lanes
echo "=== Searching for service registry or parser related code ==="
fd -t f '(registry|parser|service)' --type f | grep -E '\.(ts|js|py)$' | head -20
echo ""
echo "=== Searching in src directory for registry/parser patterns ==="
find src -type f \( -name '*registry*' -o -name '*parser*' -o -name '*service*' \) 2>/dev/null | head -20Repository: rocketride-org/rocketride-server Length of output: 796 🏁 Script executed: # Examine pipelineParser.ts to understand how it processes lanes
echo "=== Checking pipelineParser.ts ==="
wc -l apps/vscode/src/shared/util/pipelineParser.ts
# Check first portion to understand structure
head -100 apps/vscode/src/shared/util/pipelineParser.tsRepository: rocketride-org/rocketride-server Length of output: 3312 🏁 Script executed: # Check registry.js for schema handling
echo "=== Checking registry.js ==="
cat scripts/lib/registry.js | head -200Repository: rocketride-org/rocketride-server Length of output: 4861 🏁 Script executed: # Sample a standard node to confirm pattern
echo "=== Standard node: accessibility_describe ==="
cat nodes/src/nodes/accessibility_describe/services.json | head -50Repository: rocketride-org/rocketride-server Length of output: 1884 🏁 Script executed: # Search for Python code that loads/parses services.json
echo "=== Searching for services.json loading in Python ==="
rg 'services\.json' packages/client-python packages/common --type py -B 2 -A 2 | head -100
echo ""
echo "=== Searching for lane/pipe parsing logic ==="
rg '(lane|pipe)' packages/ai/src/ai/modules/services/ --type py -B 2 -A 2 | head -150Repository: rocketride-org/rocketride-server Length of output: 839 🏁 Script executed: # Broader search for services.json loading
echo "=== Searching for services.json file operations ==="
rg 'services\.json' --type py --type ts --type js -B 3 -A 3 | head -200
echo ""
echo "=== Searching for 'lanes' key handling ==="
rg '"lanes"' --type py --type ts --type js -B 2 -A 2 | head -150Repository: rocketride-org/rocketride-server Length of output: 12607 🏁 Script executed: # Check the actual service loader in packages/ai
echo "=== Checking services.py in packages/ai ==="
find packages/ai -name 'services.py' -o -name '*service*.py' | head -10Repository: rocketride-org/rocketride-server Length of output: 168 🏁 Script executed: # Examine the actual services module that loads and processes lanes
echo "=== Checking packages/ai/src/ai/modules/services/services.py ==="
wc -l packages/ai/src/ai/modules/services/services.py
# Check first 150 lines for import and lane handling
head -150 packages/ai/src/ai/modules/services/services.py
echo ""
echo "=== Look for 'lanes' key access ==="
grep -n 'lanes' packages/ai/src/ai/modules/services/services.py | head -20Repository: rocketride-org/rocketride-server Length of output: 1432 🏁 Script executed: # Search for ml_sklearn usage in tests or code
echo "=== Searching for ml_sklearn references ==="
rg 'ml_sklearn' --type py --type ts --type js -B 2 -A 2 | head -100
echo ""
echo "=== Checking if there's lane schema validation ==="
rg '(lane.*schema|schema.*lane|validate.*lane)' --type py --type ts --type js -i | head -50
echo ""
echo "=== Look for any documentation on lane format ==="
find . -name '*.md' -type f -exec grep -l 'lane' {} \; | head -10Repository: rocketride-org/rocketride-server Length of output: 3505 🏁 Script executed: # Check if ml_sklearn has any test or implementation files
echo "=== ml_sklearn implementation files ==="
find nodes/src/nodes/ml_sklearn -type f
echo ""
echo "=== Check test files for ml_sklearn ==="
find . -name '*test*' -type f | xargs grep -l 'ml_sklearn' 2>/dev/null | head -5Repository: rocketride-org/rocketride-server Length of output: 431 🏁 Script executed: # Examine ml_sklearn's implementation to understand how it expects to receive input
echo "=== ml_sklearn code.py ==="
cat nodes/src/nodes/ml_sklearn/code.py
echo ""
echo "=== ml_sklearn README ==="
cat nodes/src/nodes/ml_sklearn/README.mdRepository: rocketride-org/rocketride-server Length of output: 2089 🏁 Script executed: # Check documentation on lane format and expected schema
echo "=== Checking ROCKETRIDE_COMPONENT_REFERENCE for lane schema ==="
grep -A 20 -B 5 'lane' docs/agents/ROCKETRIDE_COMPONENT_REFERENCE.md | head -100Repository: rocketride-org/rocketride-server Length of output: 3931 Lanes schema must follow the established pattern — replace nested The ml_sklearn node is the only one (out of 71) that nests lanes under "lanes": {
"answers": ["output_lane"]
}Additionally, all other nodes include an Diff showing required changes"pipe": {
"lanes": {
"answers": {
"in": true,
"out": true
}
}
},Should be: "lanes": {
"answers": ["answers"]
},
"input": [
{
"lane": "answers",
"output": [{ "lane": "answers" }]
}
],🤖 Prompt for AI Agents |
||
| "preconfig": { | ||
| "default": { | ||
| "object": "default", | ||
| "properties": [] | ||
| } | ||
| }, | ||
| "profiles": { | ||
| "ml_sklearn.default": { | ||
| "object": "default", | ||
| "properties": [] | ||
| } | ||
| }, | ||
| "fields": [], | ||
| "shape": { | ||
| "inputs": [ | ||
| { | ||
| "name": "answers", | ||
| "type": "answers" | ||
| } | ||
| ], | ||
| "outputs": [ | ||
| { | ||
| "name": "answers", | ||
| "type": "answers" | ||
| } | ||
| ] | ||
| }, | ||
| "test": { | ||
| "answers": [ | ||
| { | ||
| "text": "hello world" | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,10 @@ | |
| from .types import LAUNCH_TYPE | ||
| from .task_conn import TaskConn | ||
| from .task_metrics import TaskMetrics | ||
| from .task_logger import get_task_logger | ||
|
|
||
| # Module-level structured logger for task lifecycle events | ||
| _logger = get_task_logger(__name__) | ||
|
|
||
|
|
||
| if TYPE_CHECKING: | ||
|
|
@@ -843,6 +847,15 @@ async def _terminated(self) -> None: | |
| apikey=task_apikey, | ||
| ) | ||
|
|
||
| _logger.info( | ||
| 'Task terminated', | ||
| extra={ | ||
| 'task_id': self.id, | ||
| 'step': 'termination', | ||
| 'exit_code': self._status.exitCode, | ||
| 'final_state': self._status.state, | ||
| }, | ||
| ) | ||
| self.debug_message('Resource cleanup completed successfully') | ||
|
|
||
| def _on_metrics_updated(self) -> None: | ||
|
|
@@ -1470,6 +1483,10 @@ async def start_task(self) -> None: | |
|
|
||
| # Set our current state | ||
| self._status.state = TASK_STATE.STARTING.value | ||
| _logger.info( | ||
| 'Task starting', | ||
| extra={'task_id': self.id, 'step': 'start'}, | ||
| ) | ||
|
|
||
| # Resolve any ${...} in the pipeline | ||
| self._pipeline = self._resolve_pipeline(self._pipeline) | ||
|
|
@@ -1586,6 +1603,15 @@ async def start_task(self) -> None: | |
| env=subprocess_env, | ||
| ) | ||
|
|
||
| _logger.info( | ||
| 'Subprocess created', | ||
| extra={ | ||
| 'task_id': self.id, | ||
| 'step': 'subprocess', | ||
| 'pid': self._engine_process.pid, | ||
| }, | ||
| ) | ||
|
|
||
| # Initialize stdio interface | ||
| try: | ||
| self._debug_stdio = Task.TaskDbgStdio( | ||
|
|
@@ -1661,6 +1687,11 @@ async def start_task(self) -> None: | |
|
|
||
| except Exception as e: | ||
| await self._terminated() | ||
| _logger.error( | ||
| 'Task startup failed', | ||
| extra={'task_id': self.id, 'step': 'error', 'error': str(e)}, | ||
| exc_info=True, | ||
| ) | ||
| self.debug_message(f'Task startup failed: {e}') | ||
| raise | ||
|
Comment on lines
1688
to
1696
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error log fires after Inside A developer querying for the root cause of a startup failure will see the termination message first and have to work backwards. Moving the error log before the Additionally, Ruff G201 flags the 🐛 Proposed fix — log the error first, then clean up, use exception() except Exception as e:
- await self._terminated()
- _logger.error(
+ _logger.exception(
'Task startup failed',
- extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
- exc_info=True,
+ extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
)
+ await self._terminated()
self.debug_message(f'Task startup failed: {e}')
raise🧰 Tools🪛 Ruff (0.15.12)[warning] 1690-1690: Logging (G201) 🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import logging | ||
| import time | ||
| from typing import Any | ||
|
|
||
|
|
||
| class _StructuredFormatter(logging.Formatter): | ||
| _RESERVED = frozenset(logging.LogRecord( | ||
| '', 0, '', 0, '', (), None | ||
| ).__dict__.keys()) | {'message', 'asctime'} | ||
|
|
||
| def format(self, record: logging.LogRecord) -> str: | ||
| record.message = record.getMessage() | ||
|
|
||
| payload = { | ||
| 'timestamp': time.strftime( | ||
| '%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created) | ||
| ), | ||
| 'level': record.levelname, | ||
| 'logger': record.name, | ||
| 'message': record.message, | ||
| } | ||
|
|
||
| for key, value in record.__dict__.items(): | ||
| if key not in self._RESERVED: | ||
| payload[key] = value | ||
|
|
||
| if record.exc_info: | ||
| payload['exception'] = self.formatException(record.exc_info) | ||
|
|
||
| return json.dumps(payload, default=str) | ||
|
|
||
|
|
||
| def get_task_logger(name: str) -> logging.Logger: | ||
| logger = logging.getLogger(name) | ||
|
|
||
| if not logger.handlers: | ||
| handler = logging.StreamHandler() | ||
| handler.setFormatter(_StructuredFormatter()) | ||
| logger.addHandler(handler) | ||
| logger.propagate = False | ||
|
|
||
| return logger | ||
|
Comment on lines
+36
to
+45
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Python's effective-level resolution always traverses the logger hierarchy regardless of 🐛 Proposed fix — set level so records reach the handler def get_task_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(_StructuredFormatter())
logger.addHandler(handler)
- logger.propagate = False
+ logger.setLevel(logging.DEBUG) # pass everything through; let the handler/app filter
+ logger.propagate = False # always disable propagation, not just on first setup
return loggerMoving 🤖 Prompt for AI Agents |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Requirements path is duplicated across
validateConfigandbeginGlobal.The expression
os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'appears verbatim in both methods. Extract it to a class-level constant to avoid divergence on future renames.♻️ Proposed refactor
class IGlobal(IGlobalBase): """Global state for the ml_sklearn node — holds the loaded sklearn model.""" preprocessor: object = None # The sklearn model/pipeline instance + _REQUIREMENTS = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') def validateConfig(self): """Validate that scikit-learn and numpy are available.""" try: from depends import depends - requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' - depends(requirements) + depends(self._REQUIREMENTS) except Exception as e: # noqa: BLE001 warning(str(e)) def beginGlobal(self): """Load the sklearn model at runtime startup.""" if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: pass else: from depends import depends - requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' - depends(requirements) + depends(self._REQUIREMENTS)Also applies to: 39-40
🤖 Prompt for AI Agents