Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f4f7aa5
Fix typo in pipeline source node description
Tejeshyewale Apr 26, 2026
b22e844
docs: improve README clarity and deployment instructions
Tejeshyewale Apr 26, 2026
319fca1
Update README.md
Tejeshyewale Apr 26, 2026
7ce10e3
Create README.md
Tejeshyewale Apr 28, 2026
90fac4d
Create code.py
Tejeshyewale Apr 28, 2026
b0598a2
Create IGlobal.py
Tejeshyewale Apr 28, 2026
ca2b870
Create IInstance.py
Tejeshyewale Apr 28, 2026
5862bd8
Create services.json
Tejeshyewale Apr 28, 2026
b33eff4
Create requirements.txt
Tejeshyewale Apr 28, 2026
be13299
Update code.py
Tejeshyewale Apr 28, 2026
4d88f21
Add files via upload
Tejeshyewale Apr 28, 2026
48a784f
Rename IInstance.py to Instance.py
Tejeshyewale Apr 28, 2026
7e6963d
Update services.json
Tejeshyewale Apr 28, 2026
5952843
Update requirements.txt
Tejeshyewale Apr 28, 2026
c667348
Update README.md
Tejeshyewale Apr 28, 2026
12c00e2
Update requirements.txt
Tejeshyewale Apr 28, 2026
c587ada
Update code.py
Tejeshyewale Apr 28, 2026
bb482f0
Merge branch 'rocketride-org:develop' into develop
Tejeshyewale Apr 30, 2026
8e676e5
Fix PreProcessor structure and improve error handling
Tejeshyewale Apr 30, 2026
50a0bd4
Fix PreProcessor node structure and improve error handling
Tejeshyewale Apr 30, 2026
96cf781
Update IGlobal.py
Tejeshyewale Apr 30, 2026
92593be
Update Instance.py
Tejeshyewale Apr 30, 2026
fe1f418
Update code.py
Tejeshyewale Apr 30, 2026
89504f9
Update code.py
Tejeshyewale Apr 30, 2026
dc7f679
Update IGlobal.py
Tejeshyewale Apr 30, 2026
4c9d406
Update Instance.py
Tejeshyewale Apr 30, 2026
c39113d
Update services.json
Tejeshyewale Apr 30, 2026
47ad4f9
Update code.py
Tejeshyewale Apr 30, 2026
3e8e46b
Update IGlobal.py
Tejeshyewale Apr 30, 2026
5399865
Update Instance.py
Tejeshyewale Apr 30, 2026
43c725d
Update README.md
Tejeshyewale Apr 30, 2026
5602c71
Merge branch 'rocketride-org:develop' into develop
Tejeshyewale Apr 30, 2026
9eaba7e
Update IGlobal.py
Tejeshyewale May 1, 2026
efa13f6
Update Instance.py
Tejeshyewale May 1, 2026
66e54c3
Update code.py
Tejeshyewale May 1, 2026
cc8840e
Create __init__.py
Tejeshyewale May 1, 2026
1f33c2f
Update requirements.txt
Tejeshyewale May 1, 2026
4d29752
Update services.json
Tejeshyewale May 1, 2026
9ad5458
Rename Instance.py to IInstance.py
Tejeshyewale May 1, 2026
282a102
Delete nodes/src/nodes/ml_sklearn/model.pkl
Tejeshyewale May 1, 2026
718b5ce
Merge branch 'rocketride-org:develop' into develop
Tejeshyewale May 1, 2026
46d6428
Merge branch 'develop' into develop
kwit75 May 1, 2026
83abe32
Merge branch 'develop' into develop
kwit75 May 1, 2026
3d91916
Merge branch 'rocketride-org:develop' into develop
Tejeshyewale May 2, 2026
73a4331
feat(ai): add TaskTracker for task lifecycle state observability
Tejeshyewale May 3, 2026
009373c
fix(ai): include task_logger dependency for task_tracker
Tejeshyewale May 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions nodes/src/nodes/ml_sklearn/IGlobal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 RocketRide Contributors
# =============================================================================

# ------------------------------------------------------------------------------
# This class controls the data shared between all threads for the task
# ------------------------------------------------------------------------------
import os

from rocketlib import IGlobalBase, OPEN_MODE, warning
from ai.common.config import Config


class IGlobal(IGlobalBase):
"""Global state for the ml_sklearn node — holds the loaded sklearn model."""

preprocessor: object = None # The sklearn model/pipeline instance

def validateConfig(self):
"""Validate that scikit-learn and numpy are available."""
try:
from depends import depends

requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
depends(requirements)
except Exception as e: # noqa: BLE001
warning(str(e))

def beginGlobal(self):
"""Load the sklearn model at runtime startup."""
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
# Config mode: don't load the model, we'll only be called
# to configure the service definition.
pass
else:
from depends import depends

requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
depends(requirements)

# Deferred import — only after deps are installed
from .code import PreProcessor

config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)
self.preprocessor = PreProcessor(config)

def endGlobal(self):
"""Release the sklearn model."""
self.preprocessor = None
45 changes: 45 additions & 0 deletions nodes/src/nodes/ml_sklearn/IInstance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 RocketRide Contributors
# =============================================================================

# ------------------------------------------------------------------------------
# This class controls the data for each thread of the task
# ------------------------------------------------------------------------------
import copy

from rocketlib import IInstanceBase, Entry

from .IGlobal import IGlobal


class IInstance(IInstanceBase):
"""Per-thread instance for the ml_sklearn node."""

IGlobal: IGlobal

def open(self, obj: Entry):
"""Called before each new pipeline object — nothing to reset for this node."""
pass

def writeAnswers(self, question):
"""
Receive a question from upstream, run sklearn inference on its text,
and forward the result to the answers output lane.

The question is deep-copied to prevent mutation in fan-out pipelines.
"""
if self.IGlobal.preprocessor is None:
raise RuntimeError('sklearn PreProcessor not initialized')

question = copy.deepcopy(question)

# Get the text to process
text = question.text if hasattr(question, 'text') else str(question)

# Run inference
result = self.IGlobal.preprocessor.process(text)

# Write result back to the question object and forward downstream
question.text = result
self.instance.writeAnswers(question)
19 changes: 19 additions & 0 deletions nodes/src/nodes/ml_sklearn/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# ML Sklearn Prediction Node

This node performs predictions using a trained scikit-learn model.

## Input

- text (number as string)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Input description is inconsistent with the test fixture in services.json.

The README states the input is a "number as string" (e.g., 250), but services.json uses "hello world" as the test payload. Either update the test fixture to a numeric string (e.g., "250") or broaden the description to "text (string)" if arbitrary strings are valid.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/README.md` at line 7, The README's input
description "text (number as string)" conflicts with the services.json test
fixture which uses "hello world"; update either the README entry in
nodes/src/nodes/ml_sklearn/README.md to describe the input as "text (string)"
for arbitrary strings, or change the services.json "test" fixture to a numeric
string like "250" so the example matches the README; ensure the README and the
"test" value in services.json are consistent.


## Output

- text (predicted value as string)

## Example

Input:
250

Output:
3.5
4 changes: 4 additions & 0 deletions nodes/src/nodes/ml_sklearn/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .IGlobal import IGlobal
from .IInstance import IInstance

__all__ = ['IGlobal', 'IInstance']
47 changes: 47 additions & 0 deletions nodes/src/nodes/ml_sklearn/code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# =============================================================================
# MIT License
# Copyright (c) 2026 RocketRide Contributors
# =============================================================================

# ------------------------------------------------------------------------------
# PreProcessor: sklearn-based text inference class
# All heavy imports are deferred — this file is imported only after
# depends() has installed requirements.txt in beginGlobal().
# ------------------------------------------------------------------------------


class PreProcessor:
"""Wraps a scikit-learn model/pipeline for text inference."""

def __init__(self, config: dict):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Two Ruff linting failures on __init__ will block CI.

  1. ARG002config is received but never read. Prefix with _ to signal intentional non-use.
  2. ANN204__init__ is missing a -> None return-type annotation.
🔧 Proposed fix
-    def __init__(self, config: dict):
+    def __init__(self, _config: dict) -> None:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, config: dict):
def __init__(self, _config: dict) -> None:
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 16-16: Missing return type annotation for special method __init__

Add return type annotation: None

(ANN204)


[warning] 16-16: Unused method argument: config

(ARG002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/code.py` at line 16, The __init__ method currently
triggers lint errors: rename the unused parameter config to _config (or prefix
it with an underscore) to satisfy ARG002, and add an explicit return type
annotation "-> None" to the __init__ signature to satisfy ANN204; update the
signature in the constructor (def __init__(self, _config: dict) -> None:) and
keep usage unchanged if the value is intentionally unused.

"""
Initialize the sklearn model.

In a real deployment, you'd load a pickled model from a path
specified in config. This stub returns text unchanged so the
node is CI-safe without a pre-trained model artifact.
"""
# Example: load a real model like this:
# import joblib
# model_path = config.get('model_path', '')
# self._model = joblib.load(model_path)
self._model = None # Replace with actual model loading

def process(self, text: str) -> str:
"""
Run sklearn inference on input text and return processed text.

Args:
text: The input string to process.

Returns:
The processed string. Currently passes through unchanged.
"""
if self._model is None:
# Pass-through when no model is loaded (safe for CI)
return text

# Example with a real model:
# prediction = self._model.predict([text])
# return str(prediction[0])
return text
2 changes: 2 additions & 0 deletions nodes/src/nodes/ml_sklearn/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
scikit-learn>=1.0.0,<2.0.0
numpy>=1.21.0,<3.0.0
52 changes: 52 additions & 0 deletions nodes/src/nodes/ml_sklearn/services.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
"ml_sklearn": {
"name": "ML sklearn",
"description": "Applies a trained scikit-learn model to process text through the pipeline.",
"icon": "python.svg",
"group": "preprocessor",
"color": "#f97316",
"runtime": "python",
"pipe": {
"lanes": {
"answers": {
"in": true,
"out": true
}
}
},
Comment on lines +9 to +16
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify: do any other "preprocessor" group nodes use the pipe.lanes structure?
# Expected: none — all should use top-level "lanes" + "input"
rg -l '"group":\s*"preprocessor"' --glob '*/services.json' | xargs -I{} sh -c 'echo "=== {} ==="; cat "{}"' | grep -A10 '"pipe"\|"lanes"'

Repository: rocketride-org/rocketride-server

Length of output: 225


🏁 Script executed:

# Find all services.json files in the repository
find . -name "services.json" -type f | head -20

Repository: rocketride-org/rocketride-server

Length of output: 978


🏁 Script executed:

# Read the ml_sklearn/services.json file to see the full context
cat -n ./nodes/src/nodes/ml_sklearn/services.json

Repository: rocketride-org/rocketride-server

Length of output: 1695


🏁 Script executed:

# Find all preprocessor group nodes and extract their pipe/lanes structures
rg '"group":\s*"preprocessor"' -l ./nodes/src/nodes/*/services.json | while read file; do
  echo "=== $file ==="
  # Extract the pipe or lanes section (simplified for readability)
  cat "$file" | jq -r 'to_entries[] | select(.value.group == "preprocessor") | .value | {pipe, lanes, input}' 2>/dev/null || echo "Error parsing"
done

Repository: rocketride-org/rocketride-server

Length of output: 269


🏁 Script executed:

# Broader search: find all preprocessor group nodes
find ./nodes/src/nodes/*/services.json -type f | xargs grep -l '"group":\s*"preprocessor"'

Repository: rocketride-org/rocketride-server

Length of output: 119


🏁 Script executed:

# Find and examine agent_langchain and response node structures
echo "=== agent_langchain ===" && cat ./nodes/src/nodes/agent_langchain/services.json | jq '.' | head -50
echo ""
echo "=== response ===" && cat ./nodes/src/nodes/response/services.json | jq '.' 2>/dev/null | head -50

Repository: rocketride-org/rocketride-server

Length of output: 1122


🏁 Script executed:

# Get the response node structure (if it exists)
find ./nodes/src/nodes -name "response" -type d | head -1

Repository: rocketride-org/rocketride-server

Length of output: 103


🏁 Script executed:

# Read the response node services.json
cat -n ./nodes/src/nodes/response/services.json

Repository: rocketride-org/rocketride-server

Length of output: 5344


🏁 Script executed:

# Check if any other node uses "pipe" wrapper
grep -r '"pipe"' ./nodes/src/nodes/*/services.json | head -20

Repository: rocketride-org/rocketride-server

Length of output: 137


🏁 Script executed:

# Verify the {"in": true, "out": true} format doesn't appear elsewhere
grep -r '"in":\s*true' ./nodes/src/nodes/*/services.json

Repository: rocketride-org/rocketride-server

Length of output: 151


Fix non-standard lane configuration structure in pipe wrapper.

The pipe wrapper with {"in": true, "out": true} format is unique to this file and doesn't match the codebase convention. Other nodes like agent_langchain and response use top-level "lanes" (with array values) paired with an "input" array. The current structure will likely prevent the pipeline framework from registering the answers lane correctly.

Use the standard structure:

🔧 Proposed fix
-        "pipe": {
-            "lanes": {
-                "answers": {
-                    "in": true,
-                    "out": true
-                }
-            }
-        },
+        "lanes": {
+            "answers": []
+        },
+        "input": [
+            {
+                "lane": "answers",
+                "output": []
+            }
+        ],
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"pipe": {
"lanes": {
"answers": {
"in": true,
"out": true
}
}
},
"lanes": {
"answers": []
},
"input": [
{
"lane": "answers",
"output": []
}
],
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/services.json` around lines 9 - 16, The "pipe"
wrapper uses a non-standard nested lanes object ("answers": {"in": true, "out":
true}) which prevents the pipeline from registering the lane; replace that
structure with the standard top-level lanes + input arrays used elsewhere (e.g.,
agent_langchain/response): remove the nested object under "pipe" and instead
declare "lanes" as an array containing "answers" and add an "input" array
referencing "answers" so the pipeline framework recognizes the answers lane.

"preconfig": {
"default": {
"object": "default",
"properties": []
}
},
"profiles": {
"ml_sklearn.default": {
"object": "default",
"properties": []
}
},
Comment on lines +17 to +28
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

preconfig is structurally malformed — Config.getNodeConfig will throw KeyError: 'profiles' at runtime.

IGlobal.beginGlobal() calls Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig). That method (in packages/ai/src/ai/common/config.py) expects:

  • preconfig.default to be a string (the default profile name, e.g. "default")
  • preconfig.profiles to be a dict of profile objects

Currently, preconfig.default is an object {"object": "default", "properties": []}, and the "profiles" dict is a top-level sibling of preconfig instead of nested inside it. When getNodeConfig executes preconfig['profiles'], it will raise KeyError: 'profiles', crashing any non-CONFIG-mode startup.

🐛 Proposed fix — restructure `preconfig` and remove misplaced `profiles`
-        "preconfig": {
-            "default": {
-                "object": "default",
-                "properties": []
-            }
-        },
-        "profiles": {
-            "ml_sklearn.default": {
-                "object": "default",
-                "properties": []
-            }
-        },
+        "preconfig": {
+            "default": "default",
+            "profiles": {
+                "default": {}
+            }
+        },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"preconfig": {
"default": {
"object": "default",
"properties": []
}
},
"profiles": {
"ml_sklearn.default": {
"object": "default",
"properties": []
}
},
"preconfig": {
"default": "default",
"profiles": {
"default": {}
}
},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/services.json` around lines 17 - 28, The preconfig
block is malformed causing Config.getNodeConfig (called from
IGlobal.beginGlobal) to raise KeyError on preconfig['profiles']; fix by making
preconfig.default a string ("default"), move the existing "ml_sklearn.default"
profile object into preconfig.profiles as a dictionary entry, and remove the
top-level "profiles" sibling; ensure the profile key matches the default name
and keep the profile object structure (e.g., object/properties) under
preconfig.profiles.

"fields": [],
"shape": {
"inputs": [
{
"name": "answers",
"type": "answers"
}
],
"outputs": [
{
"name": "answers",
"type": "answers"
}
]
},
Comment on lines +30 to +43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

shape uses the wrong format — it should be an array of section objects, not an inputs/outputs map.

In this codebase, shape is a UI form-rendering directive: an array of {section, title, properties} objects (see agent_langchain/services.json and extract_data/services.json). The current {"inputs": [...], "outputs": [...]} structure appears borrowed from a different schema convention and will not be processed correctly by the UI layer.

Since ml_sklearn has no user-configurable fields, shape should be an empty array:

🔧 Proposed fix
-        "shape": {
-            "inputs": [
-                {
-                    "name": "answers",
-                    "type": "answers"
-                }
-            ],
-            "outputs": [
-                {
-                    "name": "answers",
-                    "type": "answers"
-                }
-            ]
-        },
+        "shape": [],
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"shape": {
"inputs": [
{
"name": "answers",
"type": "answers"
}
],
"outputs": [
{
"name": "answers",
"type": "answers"
}
]
},
"shape": [],
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/services.json` around lines 30 - 43, The shape
field in nodes/src/nodes/ml_sklearn/services.json is using an inputs/outputs map
that doesn't match the project's UI schema; replace the current "shape": {
"inputs": [...], "outputs": [...] } with an empty array "shape": [] because
ml_sklearn has no user-configurable fields; update the shape value in that JSON
object (referencing the existing "shape" key in the ml_sklearn service
definition) so the UI form renderer recognizes it as having no sections.

"test": {
"answers": [
{
"text": "hello world"
}
]
}
}
}
40 changes: 40 additions & 0 deletions packages/ai/src/ai/modules/task/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
from .types import LAUNCH_TYPE
from .task_conn import TaskConn
from .task_metrics import TaskMetrics
from .task_logger import get_task_logger
from .task_tracker import tracker

# Module-level structured logger for task lifecycle events
_logger = get_task_logger(__name__)


if TYPE_CHECKING:
Expand Down Expand Up @@ -843,6 +848,19 @@ async def _terminated(self) -> None:
apikey=task_apikey,
)

_logger.info(
'Task terminated',
extra={
'task_id': self.id,
'step': 'termination',
'exit_code': self._status.exitCode,
'final_state': self._status.state,
},
)
if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):
tracker.on_failed(self.id)
else:
tracker.on_completed(self.id)
Comment on lines +860 to +863
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

_terminated() may call on_completed for a startup failure, causing a transient incorrect state.

The condition on line 860:

if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):

treats exitCode = None as falsy, so tracker.on_completed() is called when exitCode is None — which occurs when the subprocess was spawned but hasn't exited yet at the time _terminated() is invoked from the start_task exception handler.

The explicit tracker.on_failed(self.id) at line 1698 then overrides this, but between the two calls concurrent readers see a spurious COMPLETED state for a task that actually failed at startup.

The fix is to default to on_failed whenever the exit code is not a confirmed success:

🐛 Proposed fix
-        if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):
-            tracker.on_failed(self.id)
-        else:
-            tracker.on_completed(self.id)
+        if self._status.exitCode == 0 and not self._stop_requested:
+            tracker.on_completed(self.id)
+        else:
+            tracker.on_failed(self.id)

With this change, on_failed is always emitted when exitCode is None (unknown/still-running), and the explicit tracker.on_failed(self.id) at line 1698 becomes a safe, harmless redundant call instead of a critical correction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_engine.py` around lines 860 - 863, In
_terminated(), change the exitCode check so unknown/None is treated as failure:
replace the current condition that relies on falsiness of self._status.exitCode
with an explicit check (e.g., if self._stop_requested or self._status.exitCode
is None or self._status.exitCode != 0) so that tracker.on_failed(self.id) is
invoked when exitCode is None or non-zero; leave tracker.on_completed(self.id)
only for confirmed successful exitCode == 0—this prevents a transient
on_completed before the start_task exception handler later calls
tracker.on_failed.

self.debug_message('Resource cleanup completed successfully')

def _on_metrics_updated(self) -> None:
Expand Down Expand Up @@ -1470,6 +1488,11 @@ async def start_task(self) -> None:

# Set our current state
self._status.state = TASK_STATE.STARTING.value
_logger.info(
'Task starting',
extra={'task_id': self.id, 'step': 'start'},
)
tracker.on_starting(self.id)

# Resolve any ${...} in the pipeline
self._pipeline = self._resolve_pipeline(self._pipeline)
Expand Down Expand Up @@ -1586,6 +1609,16 @@ async def start_task(self) -> None:
env=subprocess_env,
)

_logger.info(
'Subprocess created',
extra={
'task_id': self.id,
'step': 'subprocess',
'pid': self._engine_process.pid,
},
)
tracker.on_running(self.id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

on_running and on_initializing are called in the wrong order, contradicting the documented lifecycle.

TaskTracker's docstring specifies the happy-path order as on_starting → on_initializing → on_running → on_completed, and the TaskState enum is declared in that same order. The semantic descriptions also confirm this — on_initializing is "subprocess about to be spawned" and on_running is "subprocess confirmed alive."

In start_task() the calls are inverted: on_running fires immediately after create_subprocess_exec (line 1620), and on_initializing fires after stdio/metrics setup (line 1651). Consumers calling tracker.get() or snapshot() will observe RUNNING before INITIALIZING, violating the documented contract.

🐛 Proposed fix — swap the two tracker calls
-            tracker.on_running(self.id)
+            tracker.on_initializing(self.id)
-            tracker.on_initializing(self.id)
+            tracker.on_running(self.id)

Also applies to: 1651-1651

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_engine.py` at line 1620, The tracker
lifecycle calls in start_task() are inverted: TaskTracker.on_running is invoked
immediately after create_subprocess_exec but on_initializing is called later;
swap them so the order matches the documented TaskState lifecycle (on_starting →
on_initializing → on_running → on_completed). Specifically, call
tracker.on_initializing(self.id) right after the subprocess is created (before
stdio/metrics setup) to mark "subprocess about to be spawned", and call
tracker.on_running(self.id) only after stdio/metrics are set up and the
subprocess is confirmed alive; update both occurrences (the current calls at the
create_subprocess_exec location and the later stdio/metrics completion location)
to preserve the correct sequence.


# Initialize stdio interface
try:
self._debug_stdio = Task.TaskDbgStdio(
Expand Down Expand Up @@ -1615,6 +1648,7 @@ async def start_task(self) -> None:

# Setup to initializing
self._status.state = TASK_STATE.INITIALIZING.value
tracker.on_initializing(self.id)

# Create the periodic status update task
self._status_update_task = asyncio.create_task(self._status_update_loop())
Expand Down Expand Up @@ -1661,6 +1695,12 @@ async def start_task(self) -> None:

except Exception as e:
await self._terminated()
tracker.on_failed(self.id)
_logger.error(
'Task startup failed',
extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
exc_info=True,
)
Comment on lines +1699 to +1703
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use _logger.exception() instead of _logger.error(..., exc_info=True).

Ruff G201 flags this. logging.exception() is the idiomatic stdlib shorthand for logging at ERROR level with the current exception traceback.

♻️ Proposed fix
-            _logger.error(
-                'Task startup failed',
-                extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
-                exc_info=True,
-            )
+            _logger.exception(
+                'Task startup failed',
+                extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
+            )
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 1699-1699: Logging .exception(...) should be used instead of .error(..., exc_info=True)

(G201)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_engine.py` around lines 1699 - 1703,
Replace the call to _logger.error(..., exc_info=True) with
_logger.exception(...) in the task startup error handling so the current
exception traceback is logged idiomatically; keep the same message string and
the extra={'task_id': self.id, 'step': 'error', 'error': str(e)} payload,
locating the call to _logger.error in task_engine.py (the task startup/error
handling block where self.id and e are referenced) and swap the logging method
to _logger.exception while preserving arguments.

self.debug_message(f'Task startup failed: {e}')
raise

Expand Down
45 changes: 45 additions & 0 deletions packages/ai/src/ai/modules/task/task_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import annotations

import json
import logging
import time
from typing import Any


class _StructuredFormatter(logging.Formatter):
_RESERVED = frozenset(logging.LogRecord(
'', 0, '', 0, '', (), None
).__dict__.keys()) | {'message', 'asctime'}

def format(self, record: logging.LogRecord) -> str:
record.message = record.getMessage()

payload = {
'timestamp': time.strftime(
'%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created)
),
Comment on lines +18 to +20
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Timestamp truncates sub-second precision

time.gmtime(record.created) silently drops the fractional-seconds portion of record.created. In practice, multiple log records emitted within the same wall-clock second will share identical timestamps, making ordering ambiguous in the structured output.

⏱️ Proposed fix to include milliseconds
-            'timestamp': time.strftime(
-                '%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created)
-            ),
+            'timestamp': '{}.{:03d}Z'.format(
+                time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(record.created)),
+                int((record.created % 1) * 1000),
+            ),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
'timestamp': time.strftime(
'%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created)
),
'timestamp': '{}.{:03d}Z'.format(
time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(record.created)),
int((record.created % 1) * 1000),
),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 18 - 20, The
timestamp field in task_logger.py is built with time.strftime(time_format,
time.gmtime(record.created)) which discards fractional seconds from
record.created; update the code that builds 'timestamp' (the time.strftime call
for the 'timestamp' key) to preserve sub-second precision by formatting
record.created with a function that includes milliseconds (e.g., using
datetime.fromtimestamp(record.created,
tz=timezone.utc).isoformat(timespec='milliseconds') or equivalent) so timestamps
include fractional seconds while keeping UTC/Zulu formatting.

'level': record.levelname,
'logger': record.name,
'message': record.message,
}

for key, value in record.__dict__.items():
if key not in self._RESERVED:
payload[key] = value
Comment on lines +26 to +28
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Use a dict comprehension instead of a for loop (Ruff PERF403)

♻️ Proposed refactor
-        for key, value in record.__dict__.items():
-            if key not in self._RESERVED:
-                payload[key] = value
+        payload.update(
+            {k: v for k, v in record.__dict__.items() if k not in self._RESERVED}
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for key, value in record.__dict__.items():
if key not in self._RESERVED:
payload[key] = value
payload.update(
{k: v for k, v in record.__dict__.items() if k not in self._RESERVED}
)
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 28-28: Use a dictionary comprehension instead of a for-loop

(PERF403)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 26 - 28, Replace
the explicit loop that builds payload from record.__dict__ with a dict
comprehension to improve performance: instead of iterating with "for key, value
in record.__dict__.items()" and testing "if key not in self._RESERVED", create a
new dict via comprehension filtering out keys in self._RESERVED and then assign
or update payload accordingly; target the code that references record.__dict__,
self._RESERVED, and payload in task_logger.py (the loop shown) and ensure
semantics remain identical (only non-reserved keys copied).


if record.exc_info:
payload['exception'] = self.formatException(record.exc_info)

return json.dumps(payload, default=str)


def get_task_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)

if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(_StructuredFormatter())
logger.addHandler(handler)
logger.propagate = False
Comment on lines +39 to +43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Race condition: duplicate handlers under concurrent get_task_logger calls

The if not logger.handlers check and logger.addHandler(handler) are not atomic. Two threads can both observe an empty handler list simultaneously and each add a handler, resulting in duplicate StreamHandler instances on the same logger — every subsequent log call will then emit each line twice (or more).

The standard remedy is to use logging.Manager's existing lock via logger.manager.disable or, more practically, to rely on the fact that logging.getLogger returns the same instance and guard the setup with an explicit lock or hasHandlers() check on the manager's lock. The simplest safe pattern is:

🔒 Proposed fix using the logging module's internal lock
+import logging.handlers as _lh

 def get_task_logger(name: str) -> logging.Logger:
     logger = logging.getLogger(name)

-    if not logger.handlers:
-        handler = logging.StreamHandler()
-        handler.setFormatter(_StructuredFormatter())
-        logger.addHandler(handler)
-        logger.propagate = False
+    logging._acquireLock()          # noqa: SLF001  (stdlib internal, stable)
+    try:
+        if not logger.handlers:
+            handler = logging.StreamHandler()
+            handler.setFormatter(_StructuredFormatter())
+            logger.addHandler(handler)
+            logger.propagate = False
+    finally:
+        logging._releaseLock()      # noqa: SLF001
 
     return logger

Alternatively, if get_task_logger is only ever called at module-import time (single-threaded), document that constraint explicitly with a comment or assert, and note it is not safe for concurrent calls.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 39 - 43, The
current get_task_logger flow can add duplicate StreamHandler instances due to a
race between the if not logger.handlers check and logger.addHandler(handler); to
fix it, guard the handler-creation and add sequence with a lock: introduce a
module-level threading.Lock and wrap the block that checks logger.handlers,
creates the StreamHandler, sets _StructuredFormatter, and calls
logger.addHandler(handler) in that lock so only one thread can perform setup for
the logger; alternatively use logger.manager.lock (the logging.Manager-provided
lock) to synchronize the same critical section around adding handlers to ensure
get_task_logger and the logger variable are configured exactly once.


return logger
Loading
Loading