Skip to content

feat(ai): add TaskTracker for task lifecycle state observability#746

Open
Tejeshyewale wants to merge 46 commits intorocketride-org:developfrom
Tejeshyewale:feat/task-execution-tracker
Open

feat(ai): add TaskTracker for task lifecycle state observability#746
Tejeshyewale wants to merge 46 commits intorocketride-org:developfrom
Tejeshyewale:feat/task-execution-tracker

Conversation

@Tejeshyewale
Copy link
Copy Markdown
Contributor

@Tejeshyewale Tejeshyewale commented May 3, 2026

Summary

Introduces a lightweight, thread-safe TaskTracker that records the
full lifecycle of every pipeline task along with timing metadata.
Builds on the structured logging added in #745.

Problem

Logs show what happened but cannot answer live questions:

  • Which tasks are active right now?
  • How long has task X been running?
  • Did task Y fail or complete, and when?

Solution

An in-process registry (tracker singleton) that is updated at every
lifecycle transition and can be queried at any time.

Changes

New File: packages/ai/src/ai/modules/task/task_tracker.py

Component Description
TaskState Enum: startinginitializingrunningcompleted/failed
TaskRecord Dataclass holding status, start_time, end_time, last_update, elapsed_seconds
TaskTracker Thread-safe registry with on_* lifecycle hooks and query helpers
tracker Module-level singleton — just import and call

Modified: packages/ai/src/ai/modules/task/task_engine.py

6 tracker calls added — zero existing logic changed:

Line Call Trigger
start_task() tracker.on_starting() STARTING state set
start_task() tracker.on_initializing() INITIALIZING state set
start_task() tracker.on_running() Subprocess confirmed alive
start_task() except tracker.on_failed() Startup exception
_terminated() tracker.on_completed() Clean exit (code 0)
_terminated() tracker.on_failed() Abnormal exit or stop

Design Decisions

  • Singleton patternfrom .task_tracker import tracker — no DI needed
  • threading.Lock — safe for concurrent task lifecycle updates
  • monotonic() timestamps — immune to system clock changes
  • Zero new dependencies — stdlib only (threading, dataclasses, enum, time)
  • Minimally invasive — 6 one-liners added to task_engine.py

Usage Example

from .task_tracker import tracker

# Query any time
record = tracker.get(task_id)
print(record.status)           # "running"
print(record.elapsed_seconds)  # 4.2031

# See all active tasks
print(tracker.active_ids())    # ["abc-123", "def-456"]

# Full snapshot
print(tracker.snapshot())      # {task_id: {status, times, elapsed}, ...}

Labels

module:ai feat

Summary by CodeRabbit

  • New Features

    • Introduced an ML sklearn prediction node for text-based inference with configurable preprocessing
    • Added task lifecycle tracking and improved structured task logging for clearer startup/termination status
  • Documentation

    • Added ML sklearn node documentation with usage examples
  • Chores

    • Added scikit-learn and numpy dependency support

Tejeshyewale and others added 15 commits April 30, 2026 13:47
- Add task_tracker.py with TaskState enum, TaskRecord dataclass,
  TaskTracker class, and module-level tracker singleton
- Integrate tracker into task_engine.py at 6 lifecycle points:
  * on_starting: STARTING state set
  * on_initializing: subprocess about to spawn
  * on_running: subprocess confirmed alive
  * on_failed: startup exception caught
  * on_completed: clean termination
  * on_failed: abnormal termination
- Thread-safe via threading.Lock
- Zero new dependencies - stdlib only
@github-actions github-actions Bot added docs Documentation module:nodes Python pipeline nodes module:ai AI/ML modules labels May 3, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 3, 2026

📝 Walkthrough

Walkthrough

Adds an ML sklearn prediction node (IGlobal, IInstance, PreProcessor, service metadata, requirements, docs) that manages a shared preprocessor and routes inference results; and introduces a task lifecycle tracking subsystem (TaskState, TaskRecord, TaskTracker, structured task logger) with integrations in the task engine.

Changes

ML Sklearn Prediction Node

Layer / File(s) Summary
Dependencies & Service Config
nodes/src/nodes/ml_sklearn/requirements.txt, nodes/src/nodes/ml_sklearn/services.json
Adds scikit-learn>=1.0.0,<2.0.0 and numpy>=1.21.0,<3.0.0; adds ml_sklearn service config with answers lanes and a test fixture.
Preprocessor Implementation
nodes/src/nodes/ml_sklearn/code.py
Adds PreProcessor class with stubbed process(text) that returns input when no model is loaded.
Global Lifecycle
nodes/src/nodes/ml_sklearn/IGlobal.py
Adds IGlobal managing preprocessor with validateConfig(), beginGlobal() (installs deps, lazy-imports PreProcessor, constructs it unless in CONFIG mode), and endGlobal() (clears preprocessor).
Instance Logic
nodes/src/nodes/ml_sklearn/IInstance.py
Adds IInstance that requires IGlobal.preprocessor, deep-copies incoming question, extracts text, runs preprocessor.process(text), updates question.text, and forwards downstream; open() is a no-op.
Package Exports
nodes/src/nodes/ml_sklearn/__init__.py
Re-exports IGlobal and IInstance via __all__.
Documentation
nodes/src/nodes/ml_sklearn/README.md
Adds README describing input/output and example mapping.

Task Lifecycle Tracking

Layer / File(s) Summary
Logging Utility
packages/ai/src/ai/modules/task/task_logger.py
Adds _StructuredFormatter and get_task_logger(name) to emit JSON-like structured log records for task events.
Tracker Data & API
packages/ai/src/ai/modules/task/task_tracker.py
Introduces TaskState enum, TaskRecord dataclass (timing/serialization), TaskTracker with lifecycle hooks (on_starting, on_initializing, on_running, on_completed, on_failed) and query helpers; exports module-level tracker.
Engine Integration
packages/ai/src/ai/modules/task/task_engine.py
Adds module logger _logger, calls tracker lifecycle hooks at startup/initialization/running/termination points, and logs errors on startup failure.

Sequence Diagram

sequenceDiagram
    actor Input as Question
    participant IInstance
    participant IGlobal
    participant PreProcessor
    participant Downstream

    Input->>IInstance: writeAnswers(question)
    IInstance->>IGlobal: ensure preprocessor exists
    IGlobal-->>IInstance: preprocessor
    IInstance->>IInstance: deep-copy question & extract text
    IInstance->>PreProcessor: process(text)
    PreProcessor-->>IInstance: processed_text
    IInstance->>Downstream: forward updated question
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • stepmikhaylov
  • jmaionchi
  • Rod-Christensen

Poem

🐇 I hopped through code to load the brain,
preprocessor ready, shared across the lane.
Tasks now tracked from start to end,
each lifecycle tick a tiny friend.
Hop on — the pipeline's set to train.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 72.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely describes the main change: adding a TaskTracker for task lifecycle state observability.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 3, 2026

No description provided.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@nodes/src/nodes/ml_sklearn/code.py`:
- Line 16: The __init__ method currently triggers lint errors: rename the unused
parameter config to _config (or prefix it with an underscore) to satisfy ARG002,
and add an explicit return type annotation "-> None" to the __init__ signature
to satisfy ANN204; update the signature in the constructor (def __init__(self,
_config: dict) -> None:) and keep usage unchanged if the value is intentionally
unused.

In `@nodes/src/nodes/ml_sklearn/README.md`:
- Line 7: The README's input description "text (number as string)" conflicts
with the services.json test fixture which uses "hello world"; update either the
README entry in nodes/src/nodes/ml_sklearn/README.md to describe the input as
"text (string)" for arbitrary strings, or change the services.json "test"
fixture to a numeric string like "250" so the example matches the README; ensure
the README and the "test" value in services.json are consistent.

In `@nodes/src/nodes/ml_sklearn/services.json`:
- Around line 30-43: The shape field in nodes/src/nodes/ml_sklearn/services.json
is using an inputs/outputs map that doesn't match the project's UI schema;
replace the current "shape": { "inputs": [...], "outputs": [...] } with an empty
array "shape": [] because ml_sklearn has no user-configurable fields; update the
shape value in that JSON object (referencing the existing "shape" key in the
ml_sklearn service definition) so the UI form renderer recognizes it as having
no sections.
- Around line 17-28: The preconfig block is malformed causing
Config.getNodeConfig (called from IGlobal.beginGlobal) to raise KeyError on
preconfig['profiles']; fix by making preconfig.default a string ("default"),
move the existing "ml_sklearn.default" profile object into preconfig.profiles as
a dictionary entry, and remove the top-level "profiles" sibling; ensure the
profile key matches the default name and keep the profile object structure
(e.g., object/properties) under preconfig.profiles.
- Around line 9-16: The "pipe" wrapper uses a non-standard nested lanes object
("answers": {"in": true, "out": true}) which prevents the pipeline from
registering the lane; replace that structure with the standard top-level lanes +
input arrays used elsewhere (e.g., agent_langchain/response): remove the nested
object under "pipe" and instead declare "lanes" as an array containing "answers"
and add an "input" array referencing "answers" so the pipeline framework
recognizes the answers lane.

In `@packages/ai/src/ai/modules/task/task_engine.py`:
- Line 1620: The tracker lifecycle calls in start_task() are inverted:
TaskTracker.on_running is invoked immediately after create_subprocess_exec but
on_initializing is called later; swap them so the order matches the documented
TaskState lifecycle (on_starting → on_initializing → on_running → on_completed).
Specifically, call tracker.on_initializing(self.id) right after the subprocess
is created (before stdio/metrics setup) to mark "subprocess about to be
spawned", and call tracker.on_running(self.id) only after stdio/metrics are set
up and the subprocess is confirmed alive; update both occurrences (the current
calls at the create_subprocess_exec location and the later stdio/metrics
completion location) to preserve the correct sequence.
- Around line 860-863: In _terminated(), change the exitCode check so
unknown/None is treated as failure: replace the current condition that relies on
falsiness of self._status.exitCode with an explicit check (e.g., if
self._stop_requested or self._status.exitCode is None or self._status.exitCode
!= 0) so that tracker.on_failed(self.id) is invoked when exitCode is None or
non-zero; leave tracker.on_completed(self.id) only for confirmed successful
exitCode == 0—this prevents a transient on_completed before the start_task
exception handler later calls tracker.on_failed.
- Around line 1699-1703: Replace the call to _logger.error(..., exc_info=True)
with _logger.exception(...) in the task startup error handling so the current
exception traceback is logged idiomatically; keep the same message string and
the extra={'task_id': self.id, 'step': 'error', 'error': str(e)} payload,
locating the call to _logger.error in task_engine.py (the task startup/error
handling block where self.id and e are referenced) and swap the logging method
to _logger.exception while preserving arguments.

In `@packages/ai/src/ai/modules/task/task_tracker.py`:
- Line 7: Replace deprecated typing aliases: remove "from typing import Dict,
Optional", change any "Dict[...]" usages to the built-in "dict[...]" generic,
and replace "Optional[T]" annotations with the union form "T | None" (e.g.,
update the TaskRecord.end_time annotation to "datetime | None"). Update
occurrences noted (around TaskRecord and other uses at the indicated locations)
and remove the now-unused Optional import.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 23bca35d-a375-4aa8-bd3d-7936b41d14a6

📥 Commits

Reviewing files that changed from the base of the PR and between 5d9963f and 73a4331.

📒 Files selected for processing (9)
  • nodes/src/nodes/ml_sklearn/IGlobal.py
  • nodes/src/nodes/ml_sklearn/IInstance.py
  • nodes/src/nodes/ml_sklearn/README.md
  • nodes/src/nodes/ml_sklearn/__init__.py
  • nodes/src/nodes/ml_sklearn/code.py
  • nodes/src/nodes/ml_sklearn/requirements.txt
  • nodes/src/nodes/ml_sklearn/services.json
  • packages/ai/src/ai/modules/task/task_engine.py
  • packages/ai/src/ai/modules/task/task_tracker.py

class PreProcessor:
"""Wraps a scikit-learn model/pipeline for text inference."""

def __init__(self, config: dict):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Two Ruff linting failures on __init__ will block CI.

  1. ARG002config is received but never read. Prefix with _ to signal intentional non-use.
  2. ANN204__init__ is missing a -> None return-type annotation.
🔧 Proposed fix
-    def __init__(self, config: dict):
+    def __init__(self, _config: dict) -> None:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, config: dict):
def __init__(self, _config: dict) -> None:
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 16-16: Missing return type annotation for special method __init__

Add return type annotation: None

(ANN204)


[warning] 16-16: Unused method argument: config

(ARG002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/code.py` at line 16, The __init__ method currently
triggers lint errors: rename the unused parameter config to _config (or prefix
it with an underscore) to satisfy ARG002, and add an explicit return type
annotation "-> None" to the __init__ signature to satisfy ANN204; update the
signature in the constructor (def __init__(self, _config: dict) -> None:) and
keep usage unchanged if the value is intentionally unused.


## Input

- text (number as string)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Input description is inconsistent with the test fixture in services.json.

The README states the input is a "number as string" (e.g., 250), but services.json uses "hello world" as the test payload. Either update the test fixture to a numeric string (e.g., "250") or broaden the description to "text (string)" if arbitrary strings are valid.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/README.md` at line 7, The README's input
description "text (number as string)" conflicts with the services.json test
fixture which uses "hello world"; update either the README entry in
nodes/src/nodes/ml_sklearn/README.md to describe the input as "text (string)"
for arbitrary strings, or change the services.json "test" fixture to a numeric
string like "250" so the example matches the README; ensure the README and the
"test" value in services.json are consistent.

Comment on lines +9 to +16
"pipe": {
"lanes": {
"answers": {
"in": true,
"out": true
}
}
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify: do any other "preprocessor" group nodes use the pipe.lanes structure?
# Expected: none — all should use top-level "lanes" + "input"
rg -l '"group":\s*"preprocessor"' --glob '*/services.json' | xargs -I{} sh -c 'echo "=== {} ==="; cat "{}"' | grep -A10 '"pipe"\|"lanes"'

Repository: rocketride-org/rocketride-server

Length of output: 225


🏁 Script executed:

# Find all services.json files in the repository
find . -name "services.json" -type f | head -20

Repository: rocketride-org/rocketride-server

Length of output: 978


🏁 Script executed:

# Read the ml_sklearn/services.json file to see the full context
cat -n ./nodes/src/nodes/ml_sklearn/services.json

Repository: rocketride-org/rocketride-server

Length of output: 1695


🏁 Script executed:

# Find all preprocessor group nodes and extract their pipe/lanes structures
rg '"group":\s*"preprocessor"' -l ./nodes/src/nodes/*/services.json | while read file; do
  echo "=== $file ==="
  # Extract the pipe or lanes section (simplified for readability)
  cat "$file" | jq -r 'to_entries[] | select(.value.group == "preprocessor") | .value | {pipe, lanes, input}' 2>/dev/null || echo "Error parsing"
done

Repository: rocketride-org/rocketride-server

Length of output: 269


🏁 Script executed:

# Broader search: find all preprocessor group nodes
find ./nodes/src/nodes/*/services.json -type f | xargs grep -l '"group":\s*"preprocessor"'

Repository: rocketride-org/rocketride-server

Length of output: 119


🏁 Script executed:

# Find and examine agent_langchain and response node structures
echo "=== agent_langchain ===" && cat ./nodes/src/nodes/agent_langchain/services.json | jq '.' | head -50
echo ""
echo "=== response ===" && cat ./nodes/src/nodes/response/services.json | jq '.' 2>/dev/null | head -50

Repository: rocketride-org/rocketride-server

Length of output: 1122


🏁 Script executed:

# Get the response node structure (if it exists)
find ./nodes/src/nodes -name "response" -type d | head -1

Repository: rocketride-org/rocketride-server

Length of output: 103


🏁 Script executed:

# Read the response node services.json
cat -n ./nodes/src/nodes/response/services.json

Repository: rocketride-org/rocketride-server

Length of output: 5344


🏁 Script executed:

# Check if any other node uses "pipe" wrapper
grep -r '"pipe"' ./nodes/src/nodes/*/services.json | head -20

Repository: rocketride-org/rocketride-server

Length of output: 137


🏁 Script executed:

# Verify the {"in": true, "out": true} format doesn't appear elsewhere
grep -r '"in":\s*true' ./nodes/src/nodes/*/services.json

Repository: rocketride-org/rocketride-server

Length of output: 151


Fix non-standard lane configuration structure in pipe wrapper.

The pipe wrapper with {"in": true, "out": true} format is unique to this file and doesn't match the codebase convention. Other nodes like agent_langchain and response use top-level "lanes" (with array values) paired with an "input" array. The current structure will likely prevent the pipeline framework from registering the answers lane correctly.

Use the standard structure:

🔧 Proposed fix
-        "pipe": {
-            "lanes": {
-                "answers": {
-                    "in": true,
-                    "out": true
-                }
-            }
-        },
+        "lanes": {
+            "answers": []
+        },
+        "input": [
+            {
+                "lane": "answers",
+                "output": []
+            }
+        ],
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"pipe": {
"lanes": {
"answers": {
"in": true,
"out": true
}
}
},
"lanes": {
"answers": []
},
"input": [
{
"lane": "answers",
"output": []
}
],
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/services.json` around lines 9 - 16, The "pipe"
wrapper uses a non-standard nested lanes object ("answers": {"in": true, "out":
true}) which prevents the pipeline from registering the lane; replace that
structure with the standard top-level lanes + input arrays used elsewhere (e.g.,
agent_langchain/response): remove the nested object under "pipe" and instead
declare "lanes" as an array containing "answers" and add an "input" array
referencing "answers" so the pipeline framework recognizes the answers lane.

Comment on lines +17 to +28
"preconfig": {
"default": {
"object": "default",
"properties": []
}
},
"profiles": {
"ml_sklearn.default": {
"object": "default",
"properties": []
}
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

preconfig is structurally malformed — Config.getNodeConfig will throw KeyError: 'profiles' at runtime.

IGlobal.beginGlobal() calls Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig). That method (in packages/ai/src/ai/common/config.py) expects:

  • preconfig.default to be a string (the default profile name, e.g. "default")
  • preconfig.profiles to be a dict of profile objects

Currently, preconfig.default is an object {"object": "default", "properties": []}, and the "profiles" dict is a top-level sibling of preconfig instead of nested inside it. When getNodeConfig executes preconfig['profiles'], it will raise KeyError: 'profiles', crashing any non-CONFIG-mode startup.

🐛 Proposed fix — restructure `preconfig` and remove misplaced `profiles`
-        "preconfig": {
-            "default": {
-                "object": "default",
-                "properties": []
-            }
-        },
-        "profiles": {
-            "ml_sklearn.default": {
-                "object": "default",
-                "properties": []
-            }
-        },
+        "preconfig": {
+            "default": "default",
+            "profiles": {
+                "default": {}
+            }
+        },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"preconfig": {
"default": {
"object": "default",
"properties": []
}
},
"profiles": {
"ml_sklearn.default": {
"object": "default",
"properties": []
}
},
"preconfig": {
"default": "default",
"profiles": {
"default": {}
}
},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/services.json` around lines 17 - 28, The preconfig
block is malformed causing Config.getNodeConfig (called from
IGlobal.beginGlobal) to raise KeyError on preconfig['profiles']; fix by making
preconfig.default a string ("default"), move the existing "ml_sklearn.default"
profile object into preconfig.profiles as a dictionary entry, and remove the
top-level "profiles" sibling; ensure the profile key matches the default name
and keep the profile object structure (e.g., object/properties) under
preconfig.profiles.

Comment on lines +30 to +43
"shape": {
"inputs": [
{
"name": "answers",
"type": "answers"
}
],
"outputs": [
{
"name": "answers",
"type": "answers"
}
]
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

shape uses the wrong format — it should be an array of section objects, not an inputs/outputs map.

In this codebase, shape is a UI form-rendering directive: an array of {section, title, properties} objects (see agent_langchain/services.json and extract_data/services.json). The current {"inputs": [...], "outputs": [...]} structure appears borrowed from a different schema convention and will not be processed correctly by the UI layer.

Since ml_sklearn has no user-configurable fields, shape should be an empty array:

🔧 Proposed fix
-        "shape": {
-            "inputs": [
-                {
-                    "name": "answers",
-                    "type": "answers"
-                }
-            ],
-            "outputs": [
-                {
-                    "name": "answers",
-                    "type": "answers"
-                }
-            ]
-        },
+        "shape": [],
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"shape": {
"inputs": [
{
"name": "answers",
"type": "answers"
}
],
"outputs": [
{
"name": "answers",
"type": "answers"
}
]
},
"shape": [],
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/services.json` around lines 30 - 43, The shape
field in nodes/src/nodes/ml_sklearn/services.json is using an inputs/outputs map
that doesn't match the project's UI schema; replace the current "shape": {
"inputs": [...], "outputs": [...] } with an empty array "shape": [] because
ml_sklearn has no user-configurable fields; update the shape value in that JSON
object (referencing the existing "shape" key in the ml_sklearn service
definition) so the UI form renderer recognizes it as having no sections.

Comment on lines +860 to +863
if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):
tracker.on_failed(self.id)
else:
tracker.on_completed(self.id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

_terminated() may call on_completed for a startup failure, causing a transient incorrect state.

The condition on line 860:

if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):

treats exitCode = None as falsy, so tracker.on_completed() is called when exitCode is None — which occurs when the subprocess was spawned but hasn't exited yet at the time _terminated() is invoked from the start_task exception handler.

The explicit tracker.on_failed(self.id) at line 1698 then overrides this, but between the two calls concurrent readers see a spurious COMPLETED state for a task that actually failed at startup.

The fix is to default to on_failed whenever the exit code is not a confirmed success:

🐛 Proposed fix
-        if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):
-            tracker.on_failed(self.id)
-        else:
-            tracker.on_completed(self.id)
+        if self._status.exitCode == 0 and not self._stop_requested:
+            tracker.on_completed(self.id)
+        else:
+            tracker.on_failed(self.id)

With this change, on_failed is always emitted when exitCode is None (unknown/still-running), and the explicit tracker.on_failed(self.id) at line 1698 becomes a safe, harmless redundant call instead of a critical correction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_engine.py` around lines 860 - 863, In
_terminated(), change the exitCode check so unknown/None is treated as failure:
replace the current condition that relies on falsiness of self._status.exitCode
with an explicit check (e.g., if self._stop_requested or self._status.exitCode
is None or self._status.exitCode != 0) so that tracker.on_failed(self.id) is
invoked when exitCode is None or non-zero; leave tracker.on_completed(self.id)
only for confirmed successful exitCode == 0—this prevents a transient
on_completed before the start_task exception handler later calls
tracker.on_failed.

'pid': self._engine_process.pid,
},
)
tracker.on_running(self.id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

on_running and on_initializing are called in the wrong order, contradicting the documented lifecycle.

TaskTracker's docstring specifies the happy-path order as on_starting → on_initializing → on_running → on_completed, and the TaskState enum is declared in that same order. The semantic descriptions also confirm this — on_initializing is "subprocess about to be spawned" and on_running is "subprocess confirmed alive."

In start_task() the calls are inverted: on_running fires immediately after create_subprocess_exec (line 1620), and on_initializing fires after stdio/metrics setup (line 1651). Consumers calling tracker.get() or snapshot() will observe RUNNING before INITIALIZING, violating the documented contract.

🐛 Proposed fix — swap the two tracker calls
-            tracker.on_running(self.id)
+            tracker.on_initializing(self.id)
-            tracker.on_initializing(self.id)
+            tracker.on_running(self.id)

Also applies to: 1651-1651

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_engine.py` at line 1620, The tracker
lifecycle calls in start_task() are inverted: TaskTracker.on_running is invoked
immediately after create_subprocess_exec but on_initializing is called later;
swap them so the order matches the documented TaskState lifecycle (on_starting →
on_initializing → on_running → on_completed). Specifically, call
tracker.on_initializing(self.id) right after the subprocess is created (before
stdio/metrics setup) to mark "subprocess about to be spawned", and call
tracker.on_running(self.id) only after stdio/metrics are set up and the
subprocess is confirmed alive; update both occurrences (the current calls at the
create_subprocess_exec location and the later stdio/metrics completion location)
to preserve the correct sequence.

Comment on lines +1699 to +1703
_logger.error(
'Task startup failed',
extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
exc_info=True,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use _logger.exception() instead of _logger.error(..., exc_info=True).

Ruff G201 flags this. logging.exception() is the idiomatic stdlib shorthand for logging at ERROR level with the current exception traceback.

♻️ Proposed fix
-            _logger.error(
-                'Task startup failed',
-                extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
-                exc_info=True,
-            )
+            _logger.exception(
+                'Task startup failed',
+                extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
+            )
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 1699-1699: Logging .exception(...) should be used instead of .error(..., exc_info=True)

(G201)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_engine.py` around lines 1699 - 1703,
Replace the call to _logger.error(..., exc_info=True) with
_logger.exception(...) in the task startup error handling so the current
exception traceback is logged idiomatically; keep the same message string and
the extra={'task_id': self.id, 'step': 'error', 'error': str(e)} payload,
locating the call to _logger.error in task_engine.py (the task startup/error
handling block where self.id and e are referenced) and swap the logging method
to _logger.exception while preserving arguments.

from dataclasses import dataclass, field
from enum import Enum
from time import monotonic
from typing import Dict, Optional
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Replace deprecated typing.Dict / typing.Optional with built-in generics.

Ruff UP035 flags line 7. On the Python 3.12 baseline used in this repo all three typing aliases are deprecated; prefer native syntax throughout.

♻️ Proposed refactor
-from typing import Dict, Optional
+from typing import Optional
-    self._records: Dict[str, TaskRecord] = {}
+    self._records: dict[str, TaskRecord] = {}
-    def get(self, task_id: str) -> Optional[TaskRecord]:
+    def get(self, task_id: str) -> TaskRecord | None:
-    def snapshot(self) -> Dict[str, dict]:
+    def snapshot(self) -> dict[str, dict]:

And update the end_time annotation in TaskRecord:

-    end_time:    Optional[float]
+    end_time:    float | None

After these changes the Optional import can be removed entirely.

Also applies to: 68-68, 98-98, 103-103

🧰 Tools
🪛 Ruff (0.15.12)

[warning] 7-7: typing.Dict is deprecated, use dict instead

(UP035)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_tracker.py` at line 7, Replace
deprecated typing aliases: remove "from typing import Dict, Optional", change
any "Dict[...]" usages to the built-in "dict[...]" generic, and replace
"Optional[T]" annotations with the union form "T | None" (e.g., update the
TaskRecord.end_time annotation to "datetime | None"). Update occurrences noted
(around TaskRecord and other uses at the indicated locations) and remove the
now-unused Optional import.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/ai/src/ai/modules/task/task_logger.py`:
- Around line 26-28: Replace the explicit loop that builds payload from
record.__dict__ with a dict comprehension to improve performance: instead of
iterating with "for key, value in record.__dict__.items()" and testing "if key
not in self._RESERVED", create a new dict via comprehension filtering out keys
in self._RESERVED and then assign or update payload accordingly; target the code
that references record.__dict__, self._RESERVED, and payload in task_logger.py
(the loop shown) and ensure semantics remain identical (only non-reserved keys
copied).
- Around line 18-20: The timestamp field in task_logger.py is built with
time.strftime(time_format, time.gmtime(record.created)) which discards
fractional seconds from record.created; update the code that builds 'timestamp'
(the time.strftime call for the 'timestamp' key) to preserve sub-second
precision by formatting record.created with a function that includes
milliseconds (e.g., using datetime.fromtimestamp(record.created,
tz=timezone.utc).isoformat(timespec='milliseconds') or equivalent) so timestamps
include fractional seconds while keeping UTC/Zulu formatting.
- Around line 39-43: The current get_task_logger flow can add duplicate
StreamHandler instances due to a race between the if not logger.handlers check
and logger.addHandler(handler); to fix it, guard the handler-creation and add
sequence with a lock: introduce a module-level threading.Lock and wrap the block
that checks logger.handlers, creates the StreamHandler, sets
_StructuredFormatter, and calls logger.addHandler(handler) in that lock so only
one thread can perform setup for the logger; alternatively use
logger.manager.lock (the logging.Manager-provided lock) to synchronize the same
critical section around adding handlers to ensure get_task_logger and the logger
variable are configured exactly once.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 2275cab2-c4e3-412a-96b4-1109ac76533a

📥 Commits

Reviewing files that changed from the base of the PR and between 73a4331 and 009373c.

📒 Files selected for processing (1)
  • packages/ai/src/ai/modules/task/task_logger.py

Comment on lines +18 to +20
'timestamp': time.strftime(
'%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created)
),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Timestamp truncates sub-second precision

time.gmtime(record.created) silently drops the fractional-seconds portion of record.created. In practice, multiple log records emitted within the same wall-clock second will share identical timestamps, making ordering ambiguous in the structured output.

⏱️ Proposed fix to include milliseconds
-            'timestamp': time.strftime(
-                '%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created)
-            ),
+            'timestamp': '{}.{:03d}Z'.format(
+                time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(record.created)),
+                int((record.created % 1) * 1000),
+            ),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
'timestamp': time.strftime(
'%Y-%m-%dT%H:%M:%SZ', time.gmtime(record.created)
),
'timestamp': '{}.{:03d}Z'.format(
time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(record.created)),
int((record.created % 1) * 1000),
),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 18 - 20, The
timestamp field in task_logger.py is built with time.strftime(time_format,
time.gmtime(record.created)) which discards fractional seconds from
record.created; update the code that builds 'timestamp' (the time.strftime call
for the 'timestamp' key) to preserve sub-second precision by formatting
record.created with a function that includes milliseconds (e.g., using
datetime.fromtimestamp(record.created,
tz=timezone.utc).isoformat(timespec='milliseconds') or equivalent) so timestamps
include fractional seconds while keeping UTC/Zulu formatting.

Comment on lines +26 to +28
for key, value in record.__dict__.items():
if key not in self._RESERVED:
payload[key] = value
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Use a dict comprehension instead of a for loop (Ruff PERF403)

♻️ Proposed refactor
-        for key, value in record.__dict__.items():
-            if key not in self._RESERVED:
-                payload[key] = value
+        payload.update(
+            {k: v for k, v in record.__dict__.items() if k not in self._RESERVED}
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for key, value in record.__dict__.items():
if key not in self._RESERVED:
payload[key] = value
payload.update(
{k: v for k, v in record.__dict__.items() if k not in self._RESERVED}
)
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 28-28: Use a dictionary comprehension instead of a for-loop

(PERF403)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 26 - 28, Replace
the explicit loop that builds payload from record.__dict__ with a dict
comprehension to improve performance: instead of iterating with "for key, value
in record.__dict__.items()" and testing "if key not in self._RESERVED", create a
new dict via comprehension filtering out keys in self._RESERVED and then assign
or update payload accordingly; target the code that references record.__dict__,
self._RESERVED, and payload in task_logger.py (the loop shown) and ensure
semantics remain identical (only non-reserved keys copied).

Comment on lines +39 to +43
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(_StructuredFormatter())
logger.addHandler(handler)
logger.propagate = False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Race condition: duplicate handlers under concurrent get_task_logger calls

The if not logger.handlers check and logger.addHandler(handler) are not atomic. Two threads can both observe an empty handler list simultaneously and each add a handler, resulting in duplicate StreamHandler instances on the same logger — every subsequent log call will then emit each line twice (or more).

The standard remedy is to use logging.Manager's existing lock via logger.manager.disable or, more practically, to rely on the fact that logging.getLogger returns the same instance and guard the setup with an explicit lock or hasHandlers() check on the manager's lock. The simplest safe pattern is:

🔒 Proposed fix using the logging module's internal lock
+import logging.handlers as _lh

 def get_task_logger(name: str) -> logging.Logger:
     logger = logging.getLogger(name)

-    if not logger.handlers:
-        handler = logging.StreamHandler()
-        handler.setFormatter(_StructuredFormatter())
-        logger.addHandler(handler)
-        logger.propagate = False
+    logging._acquireLock()          # noqa: SLF001  (stdlib internal, stable)
+    try:
+        if not logger.handlers:
+            handler = logging.StreamHandler()
+            handler.setFormatter(_StructuredFormatter())
+            logger.addHandler(handler)
+            logger.propagate = False
+    finally:
+        logging._releaseLock()      # noqa: SLF001
 
     return logger

Alternatively, if get_task_logger is only ever called at module-import time (single-threaded), document that constraint explicitly with a comment or assert, and note it is not safe for concurrent calls.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 39 - 43, The
current get_task_logger flow can add duplicate StreamHandler instances due to a
race between the if not logger.handlers check and logger.addHandler(handler); to
fix it, guard the handler-creation and add sequence with a lock: introduce a
module-level threading.Lock and wrap the block that checks logger.handlers,
creates the StreamHandler, sets _StructuredFormatter, and calls
logger.addHandler(handler) in that lock so only one thread can perform setup for
the logger; alternatively use logger.manager.lock (the logging.Manager-provided
lock) to synchronize the same critical section around adding handlers to ensure
get_task_logger and the logger variable are configured exactly once.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs Documentation module:ai AI/ML modules module:nodes Python pipeline nodes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants