Skip to content

feat(ai): add structured logging to task engine lifecycle#745

Open
Tejeshyewale wants to merge 45 commits into
rocketride-org:developfrom
Tejeshyewale:feat/structured-task-logging
Open

feat(ai): add structured logging to task engine lifecycle#745
Tejeshyewale wants to merge 45 commits into
rocketride-org:developfrom
Tejeshyewale:feat/structured-task-logging

Conversation

@Tejeshyewale
Copy link
Copy Markdown
Contributor

@Tejeshyewale Tejeshyewale commented May 3, 2026

Summary

Adds a structured JSON logger to the task execution engine to improve
debuggability and observability of pipeline lifecycle events.

Changes

New File: packages/ai/src/ai/modules/task/task_logger.py

  • _StructuredFormatter — formats every log record as single-line JSON
  • get_task_logger() — factory function, returns a configured Logger
  • Every log line contains: timestamp, level, logger, message
  • Extra fields via extra={}: task_id, step, and any additional context

Modified: packages/ai/src/ai/modules/task/task_engine.py

Four structured log points added — zero existing logic changed:

Location step value What it captures
start_task() — after STARTING state set start Task received and starting
start_task() — after create_subprocess_exec subprocess PID of spawned process
start_task() — in except block error Exception message + traceback
_terminated() — before final debug_message termination exit_code + final_state

Why This Matters

debug_message() outputs free-form text that is hard to filter in log
aggregators. The new structured lines emit JSON queryable by any log
pipeline (Datadog, Loki, CloudWatch Insights):

{"timestamp":"2026-05-03T10:22:01Z","level":"INFO","task_id":"abc-123","step":"subprocess","pid":9876}

Testing

python -c "from ai.modules.task.task_logger import get_task_logger; print('OK')"

Summary by CodeRabbit

  • New Features
    • Added ML Sklearn Prediction Node for text inference using trained scikit-learn models with automated dependency management
    • Implemented structured task logging with JSON-formatted execution events for improved diagnostics and lifecycle monitoring

@github-actions github-actions Bot added docs Documentation module:nodes Python pipeline nodes module:ai AI/ML modules labels May 3, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 3, 2026

📝 Walkthrough

Walkthrough

The PR introduces a new ml_sklearn node for scikit-learn model inference with global model management and per-thread processing, and adds structured JSON logging to task lifecycle events in the AI module.

Changes

ML Sklearn Node Implementation

Layer / File(s) Summary
Service & Dependency Definition
nodes/src/nodes/ml_sklearn/services.json, nodes/src/nodes/ml_sklearn/requirements.txt
Service declares ml_sklearn with Python runtime, answers lane I/O, and test fixture. Dependencies constrain scikit-learn (>=1.0.0,<2.0.0) and numpy (>=1.21.0,<3.0.0).
Model Wrapper
nodes/src/nodes/ml_sklearn/code.py
PreProcessor class wraps the sklearn model. Constructor initializes _model as a stub; process(text) currently returns text unchanged with placeholders for real inference logic.
Global Lifecycle & Model Management
nodes/src/nodes/ml_sklearn/IGlobal.py
IGlobal controller manages shared preprocessor instance. validateConfig() validates dependencies; beginGlobal() defers import and instantiation of PreProcessor, skipping model load in CONFIG mode; endGlobal() releases resources.
Per-Thread Processing
nodes/src/nodes/ml_sklearn/IInstance.py
IInstance implements writeAnswers() to deep-copy incoming question, extract text, run preprocessor.process(), write result back to question.text, and forward downstream. Includes no-op open() hook.
Module Exports & Documentation
nodes/src/nodes/ml_sklearn/__init__.py, nodes/src/nodes/ml_sklearn/README.md
__init__.py exports IGlobal and IInstance. README documents the node's purpose, input/output fields, and example mapping (250 → 3.5).

Task Logging Infrastructure

Layer / File(s) Summary
Structured Logger Implementation
packages/ai/src/ai/modules/task/task_logger.py
New module defines _StructuredFormatter that serializes log records to JSON with UTC timestamp, level, logger name, message, custom fields, and optional exception info. Provides get_task_logger(name) for idempotent logger setup with stream handler.
Task Lifecycle Logging Integration
packages/ai/src/ai/modules/task/task_engine.py
Integrates structured logging into task lifecycle methods: _terminated() logs termination with task ID, step, exit code, and state; start_task() logs startup initiation, subprocess creation with PID, and startup failures with exception info.

Sequence Diagram

sequenceDiagram
    participant Node as ML Sklearn Node
    participant Global as IGlobal
    participant Inst as IInstance
    participant Proc as PreProcessor
    participant DS as Downstream

    Note over Node,DS: Node Initialization
    Node->>Global: validateConfig()
    Global->>Global: depends(requirements.txt)
    
    Node->>Global: beginGlobal()
    Global->>Global: Load dependencies
    Global->>Proc: Create PreProcessor(config)
    Global->>Global: preprocessor = instance
    
    Note over Node,DS: Request Processing
    Inst->>Inst: writeAnswers(question)
    Inst->>Inst: Validate preprocessor exists
    Inst->>Inst: Deep-copy question
    Inst->>Proc: process(text)
    Proc->>Proc: Run inference (stub)
    Proc-->>Inst: result text
    Inst->>Inst: question.text = result
    Inst->>DS: writeAnswers(question)
    
    Note over Node,DS: Cleanup
    Node->>Global: endGlobal()
    Global->>Global: preprocessor = None
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested labels

module:nodes, module:ai

Suggested reviewers

  • jmaionchi
  • stepmikhaylov
  • Rod-Christensen

Poem

🐰 A sklearn node springs to life with global grace,
Deep-copying questions at a rapid pace,
Structured logs now dance in JSON streams,
Inference pipelines fulfilling data dreams! 📊✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the primary change: adding structured logging to the task engine lifecycle in the AI module.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 3, 2026

No description provided.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@nodes/src/nodes/ml_sklearn/code.py`:
- Around line 16-28: The __init__ currently accepts config but doesn't store it
and lacks a return type; update the constructor signature to include the return
annotation (def __init__(self, config: dict) -> None:) and either store the
config on the instance (self._config = config) so future methods (e.g., reload
helpers) can access it, or rename the parameter to _config to signal intentional
non-use; ensure existing usage of self._model remains unchanged (self._model =
None) after adding the annotation and storing/renaming the config.

In `@nodes/src/nodes/ml_sklearn/IGlobal.py`:
- Around line 22-28: The duplicated requirements path expression used in
beginGlobal and validateConfig should be extracted to a single class-level
constant (e.g. REQUIREMENTS_PATH) so both methods reference that constant;
update the class (in IGlobal) to define REQUIREMENTS_PATH =
os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
and replace the inline expressions in beginGlobal and validateConfig with that
constant, keeping the existing try/except and depends(requirements) call
semantics.

In `@nodes/src/nodes/ml_sklearn/README.md`:
- Line 7: The README entry that documents the node input as "text (number as
string)" conflicts with the test fixture in services.json which currently
supplies "hello world"; update the test fixture(s) in services.json to provide a
numeric string (for example "250") so the fixture matches the documented input
contract described in README.md (also fix the other instances noted around lines
44-49).

In `@nodes/src/nodes/ml_sklearn/services.json`:
- Around line 9-16: The JSON uses a nested "pipe" -> "lanes" object with boolean
"in"/"out" for the "answers" lane; replace that with a top-level "lanes" object
where "answers" maps to an array (e.g., "answers": ["answers"]) and add a
top-level "input" array that routes from lane "answers" to output lane "answers"
per the other services' schema; specifically remove the "pipe" block and add the
top-level "lanes" and "input" keys for the ml_sklearn service so the registry
can parse "answers" correctly.

In `@packages/ai/src/ai/modules/task/task_engine.py`:
- Around line 1688-1696: The exception handling in the task startup path logs
termination before the error (call to await self._terminated() happens before
logging) and uses _logger.error(..., exc_info=True) instead of the idiomatic
_logger.exception; update the except block in TaskEngine.task startup code so
you first log the failure with _logger.exception('Task startup failed',
extra={...}) and call self.debug_message(...) (or include same context) and only
after logging await self._terminated(); keep the final raise to re-raise the
exception.

In `@packages/ai/src/ai/modules/task/task_logger.py`:
- Around line 36-45: The get_task_logger function currently never sets a logger
level and only sets propagate=False inside the "if not logger.handlers" guard,
causing INFO messages to be dropped and duplicate output in some setups; fix by
explicitly setting the logger level (e.g. logger.setLevel(logging.INFO) or
logger.setLevel(logging.DEBUG) as appropriate) so records pass level filtering,
move logger.propagate = False outside the handlers-creation guard so it is
always applied, and optionally ensure the StreamHandler has an appropriate level
(handler.setLevel or leave NOTSET) so the handler receives the records; these
changes should be made in get_task_logger.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: abe38803-d3f5-42ae-b26f-4f879fa67760

📥 Commits

Reviewing files that changed from the base of the PR and between 5d9963f and 4fe5fe9.

📒 Files selected for processing (9)
  • nodes/src/nodes/ml_sklearn/IGlobal.py
  • nodes/src/nodes/ml_sklearn/IInstance.py
  • nodes/src/nodes/ml_sklearn/README.md
  • nodes/src/nodes/ml_sklearn/__init__.py
  • nodes/src/nodes/ml_sklearn/code.py
  • nodes/src/nodes/ml_sklearn/requirements.txt
  • nodes/src/nodes/ml_sklearn/services.json
  • packages/ai/src/ai/modules/task/task_engine.py
  • packages/ai/src/ai/modules/task/task_logger.py

Comment on lines +16 to +28
def __init__(self, config: dict):
"""
Initialize the sklearn model.

In a real deployment, you'd load a pickled model from a path
specified in config. This stub returns text unchanged so the
node is CI-safe without a pre-trained model artifact.
"""
# Example: load a real model like this:
# import joblib
# model_path = config.get('model_path', '')
# self._model = joblib.load(model_path)
self._model = None # Replace with actual model loading
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

config is not stored — silently drops runtime configuration; also missing -> None annotation.

Two related problems:

  1. Unused config (ARG002): The parameter is received and ignored — self._config = config is never set. The commented-out loading example works because config is in-scope within __init__, but if any future method (e.g., a reload helper) needs it, it won't be accessible. At minimum, prefix the parameter _config to signal intentional non-use in the stub, or store it.
  2. Missing -> None annotation (ANN204): Ruff flags this; per the project's ruff lint requirement for nodes/**/*.py, __init__ should be annotated with -> None.
🛠️ Proposed fix
-    def __init__(self, config: dict):
+    def __init__(self, config: dict) -> None:
         """
         Initialize the sklearn model.
         ...
         """
+        self._config = config  # Retained for real model loading
         # Example: load a real model like this:
         # import joblib
-        # model_path = config.get('model_path', '')
+        # model_path = self._config.get('model_path', '')
         # self._model = joblib.load(model_path)
         self._model = None  # Replace with actual model loading
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 16-16: Missing return type annotation for special method __init__

Add return type annotation: None

(ANN204)


[warning] 16-16: Unused method argument: config

(ARG002)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/code.py` around lines 16 - 28, The __init__
currently accepts config but doesn't store it and lacks a return type; update
the constructor signature to include the return annotation (def __init__(self,
config: dict) -> None:) and either store the config on the instance
(self._config = config) so future methods (e.g., reload helpers) can access it,
or rename the parameter to _config to signal intentional non-use; ensure
existing usage of self._model remains unchanged (self._model = None) after
adding the annotation and storing/renaming the config.

Comment on lines +22 to +28
try:
from depends import depends

requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
depends(requirements)
except Exception as e: # noqa: BLE001
warning(str(e))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Requirements path is duplicated across validateConfig and beginGlobal.

The expression os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' appears verbatim in both methods. Extract it to a class-level constant to avoid divergence on future renames.

♻️ Proposed refactor
 class IGlobal(IGlobalBase):
     """Global state for the ml_sklearn node — holds the loaded sklearn model."""

     preprocessor: object = None  # The sklearn model/pipeline instance
+    _REQUIREMENTS = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')

     def validateConfig(self):
         """Validate that scikit-learn and numpy are available."""
         try:
             from depends import depends
-            requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
-            depends(requirements)
+            depends(self._REQUIREMENTS)
         except Exception as e:  # noqa: BLE001
             warning(str(e))

     def beginGlobal(self):
         """Load the sklearn model at runtime startup."""
         if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
             pass
         else:
             from depends import depends
-            requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt'
-            depends(requirements)
+            depends(self._REQUIREMENTS)

Also applies to: 39-40

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/IGlobal.py` around lines 22 - 28, The duplicated
requirements path expression used in beginGlobal and validateConfig should be
extracted to a single class-level constant (e.g. REQUIREMENTS_PATH) so both
methods reference that constant; update the class (in IGlobal) to define
REQUIREMENTS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'requirements.txt') and replace the inline expressions in beginGlobal and
validateConfig with that constant, keeping the existing try/except and
depends(requirements) call semantics.


## Input

- text (number as string)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Test fixture in services.json contradicts the documented input format.

The README specifies the input as text (number as string), but the test fixture in services.json supplies "hello world" — a non-numeric string. These should be consistent so the test fixture exercises the documented contract. Consider aligning the fixture to a numeric string (e.g., "250") to match the stated input type.

Also applies to: 44-49

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/README.md` at line 7, The README entry that
documents the node input as "text (number as string)" conflicts with the test
fixture in services.json which currently supplies "hello world"; update the test
fixture(s) in services.json to provide a numeric string (for example "250") so
the fixture matches the documented input contract described in README.md (also
fix the other instances noted around lines 44-49).

Comment on lines +9 to +16
"pipe": {
"lanes": {
"answers": {
"in": true,
"out": true
}
}
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Check what lane schema is used by other existing nodes to confirm
# whether ml_sklearn's pipe.lanes schema matches or deviates from the pattern.

rg -l '"pipe"' nodes/src/nodes --include='services.json' -n

echo "--- agent_langchain services.json ---"
fd 'services.json' nodes/src/nodes/agent_langchain --exec cat {}

echo "--- search_exa services.json ---"
fd 'services.json' nodes/src/nodes/search_exa --exec cat {}

echo "--- ml_sklearn services.json ---"
fd 'services.json' nodes/src/nodes/ml_sklearn --exec cat {}

Repository: rocketride-org/rocketride-server

Length of output: 7550


🏁 Script executed:

# Check if any other nodes use the "pipe" pattern
echo "=== Checking for 'pipe' usage in all services.json files ==="
find nodes/src/nodes -name 'services.json' -type f | while read file; do
  if grep -q '"pipe"' "$file"; then
    echo "Found 'pipe' in: $file"
  fi
done

echo ""
echo "=== Total services.json files in codebase ==="
find nodes/src/nodes -name 'services.json' -type f | wc -l

echo ""
echo "=== List all services.json file locations ==="
find nodes/src/nodes -name 'services.json' -type f

Repository: rocketride-org/rocketride-server

Length of output: 3314


🏁 Script executed:

# Search for service registry or parser code that validates/processes lanes
echo "=== Searching for service registry or parser related code ==="
fd -t f '(registry|parser|service)' --type f | grep -E '\.(ts|js|py)$' | head -20

echo ""
echo "=== Searching in src directory for registry/parser patterns ==="
find src -type f \( -name '*registry*' -o -name '*parser*' -o -name '*service*' \) 2>/dev/null | head -20

Repository: rocketride-org/rocketride-server

Length of output: 796


🏁 Script executed:

# Examine pipelineParser.ts to understand how it processes lanes
echo "=== Checking pipelineParser.ts ==="
wc -l apps/vscode/src/shared/util/pipelineParser.ts

# Check first portion to understand structure
head -100 apps/vscode/src/shared/util/pipelineParser.ts

Repository: rocketride-org/rocketride-server

Length of output: 3312


🏁 Script executed:

# Check registry.js for schema handling
echo "=== Checking registry.js ==="
cat scripts/lib/registry.js | head -200

Repository: rocketride-org/rocketride-server

Length of output: 4861


🏁 Script executed:

# Sample a standard node to confirm pattern
echo "=== Standard node: accessibility_describe ==="
cat nodes/src/nodes/accessibility_describe/services.json | head -50

Repository: rocketride-org/rocketride-server

Length of output: 1884


🏁 Script executed:

# Search for Python code that loads/parses services.json
echo "=== Searching for services.json loading in Python ==="
rg 'services\.json' packages/client-python packages/common --type py -B 2 -A 2 | head -100

echo ""
echo "=== Searching for lane/pipe parsing logic ==="
rg '(lane|pipe)' packages/ai/src/ai/modules/services/ --type py -B 2 -A 2 | head -150

Repository: rocketride-org/rocketride-server

Length of output: 839


🏁 Script executed:

# Broader search for services.json loading
echo "=== Searching for services.json file operations ==="
rg 'services\.json' --type py --type ts --type js -B 3 -A 3 | head -200

echo ""
echo "=== Searching for 'lanes' key handling ==="
rg '"lanes"' --type py --type ts --type js -B 2 -A 2 | head -150

Repository: rocketride-org/rocketride-server

Length of output: 12607


🏁 Script executed:

# Check the actual service loader in packages/ai
echo "=== Checking services.py in packages/ai ==="
find packages/ai -name 'services.py' -o -name '*service*.py' | head -10

Repository: rocketride-org/rocketride-server

Length of output: 168


🏁 Script executed:

# Examine the actual services module that loads and processes lanes
echo "=== Checking packages/ai/src/ai/modules/services/services.py ==="
wc -l packages/ai/src/ai/modules/services/services.py

# Check first 150 lines for import and lane handling
head -150 packages/ai/src/ai/modules/services/services.py

echo ""
echo "=== Look for 'lanes' key access ==="
grep -n 'lanes' packages/ai/src/ai/modules/services/services.py | head -20

Repository: rocketride-org/rocketride-server

Length of output: 1432


🏁 Script executed:

# Search for ml_sklearn usage in tests or code
echo "=== Searching for ml_sklearn references ==="
rg 'ml_sklearn' --type py --type ts --type js -B 2 -A 2 | head -100

echo ""
echo "=== Checking if there's lane schema validation ==="
rg '(lane.*schema|schema.*lane|validate.*lane)' --type py --type ts --type js -i | head -50

echo ""
echo "=== Look for any documentation on lane format ==="
find . -name '*.md' -type f -exec grep -l 'lane' {} \; | head -10

Repository: rocketride-org/rocketride-server

Length of output: 3505


🏁 Script executed:

# Check if ml_sklearn has any test or implementation files
echo "=== ml_sklearn implementation files ==="
find nodes/src/nodes/ml_sklearn -type f

echo ""
echo "=== Check test files for ml_sklearn ==="
find . -name '*test*' -type f | xargs grep -l 'ml_sklearn' 2>/dev/null | head -5

Repository: rocketride-org/rocketride-server

Length of output: 431


🏁 Script executed:

# Examine ml_sklearn's implementation to understand how it expects to receive input
echo "=== ml_sklearn code.py ==="
cat nodes/src/nodes/ml_sklearn/code.py

echo ""
echo "=== ml_sklearn README ==="
cat nodes/src/nodes/ml_sklearn/README.md

Repository: rocketride-org/rocketride-server

Length of output: 2089


🏁 Script executed:

# Check documentation on lane format and expected schema
echo "=== Checking ROCKETRIDE_COMPONENT_REFERENCE for lane schema ==="
grep -A 20 -B 5 'lane' docs/agents/ROCKETRIDE_COMPONENT_REFERENCE.md | head -100

Repository: rocketride-org/rocketride-server

Length of output: 3931


Lanes schema must follow the established pattern — replace nested pipe structure with top-level lanes object.

The ml_sklearn node is the only one (out of 71) that nests lanes under "pipe" and uses boolean in/out properties. All other nodes, including agent_langchain and search_exa, declare lanes at the top level with array values:

"lanes": {
  "answers": ["output_lane"]
}

Additionally, all other nodes include an "input" array for routing configuration, which ml_sklearn lacks entirely. According to RocketRide documentation, the lanes object is "the definitive reference for data flow" and is expected at the top level with keys mapping to output lane arrays. The current structure will not be parsed correctly by the service registry, causing pipeline wiring failures at runtime.

Diff showing required changes
"pipe": {
    "lanes": {
        "answers": {
            "in": true,
            "out": true
        }
    }
},

Should be:

"lanes": {
    "answers": ["answers"]
},
"input": [
    {
        "lane": "answers",
        "output": [{ "lane": "answers" }]
    }
],
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nodes/src/nodes/ml_sklearn/services.json` around lines 9 - 16, The JSON uses
a nested "pipe" -> "lanes" object with boolean "in"/"out" for the "answers"
lane; replace that with a top-level "lanes" object where "answers" maps to an
array (e.g., "answers": ["answers"]) and add a top-level "input" array that
routes from lane "answers" to output lane "answers" per the other services'
schema; specifically remove the "pipe" block and add the top-level "lanes" and
"input" keys for the ml_sklearn service so the registry can parse "answers"
correctly.

Comment on lines 1688 to 1696
except Exception as e:
await self._terminated()
_logger.error(
'Task startup failed',
extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
exc_info=True,
)
self.debug_message(f'Task startup failed: {e}')
raise
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Error log fires after _terminated() — "Task terminated" will always precede "Task startup failed" in the log stream, inverting the causal order.

Inside _terminated() (line 850), the termination event is logged unconditionally. Because await self._terminated() runs first (line 1689), every startup failure will produce log entries in this order:

{"step": "termination", "message": "Task terminated", ...}   ← logged inside _terminated()
{"step": "error",       "message": "Task startup failed", ...} ← logged after _terminated() returns

A developer querying for the root cause of a startup failure will see the termination message first and have to work backwards. Moving the error log before the _terminated() call restores the expected causal sequence.

Additionally, Ruff G201 flags the exc_info=True form — _logger.exception() is the idiomatic spelling and drops the redundant kwarg.

🐛 Proposed fix — log the error first, then clean up, use exception()
         except Exception as e:
-            await self._terminated()
-            _logger.error(
+            _logger.exception(
                 'Task startup failed',
-                extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
-                exc_info=True,
+                extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
             )
+            await self._terminated()
             self.debug_message(f'Task startup failed: {e}')
             raise
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 1690-1690: Logging .exception(...) should be used instead of .error(..., exc_info=True)

(G201)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_engine.py` around lines 1688 - 1696, The
exception handling in the task startup path logs termination before the error
(call to await self._terminated() happens before logging) and uses
_logger.error(..., exc_info=True) instead of the idiomatic _logger.exception;
update the except block in TaskEngine.task startup code so you first log the
failure with _logger.exception('Task startup failed', extra={...}) and call
self.debug_message(...) (or include same context) and only after logging await
self._terminated(); keep the final raise to re-raise the exception.

Comment on lines +36 to +45
def get_task_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)

if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(_StructuredFormatter())
logger.addHandler(handler)
logger.propagate = False

return logger
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

get_task_logger never sets a log level — all _logger.info() calls will be silently dropped in default configurations.

Python's effective-level resolution always traverses the logger hierarchy regardless of propagate. With propagate = False, records don't bubble up to parent handlers, but level filtering still uses getEffectiveLevel(), which walks up to the root until it finds a non-NOTSET level. The root logger's default level is WARNING, so every _logger.info(...) call added by this PR will be filtered out before it ever reaches the StreamHandler.

🐛 Proposed fix — set level so records reach the handler
 def get_task_logger(name: str) -> logging.Logger:
     logger = logging.getLogger(name)

     if not logger.handlers:
         handler = logging.StreamHandler()
         handler.setFormatter(_StructuredFormatter())
         logger.addHandler(handler)
-        logger.propagate = False

+    logger.setLevel(logging.DEBUG)   # pass everything through; let the handler/app filter
+    logger.propagate = False         # always disable propagation, not just on first setup

     return logger

Moving propagate = False outside the guard is also necessary: if the logger is already configured (e.g. by a test framework or an early basicConfig call that names this logger), the guard body is skipped and propagate stays True, causing duplicate output to parent handlers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ai/src/ai/modules/task/task_logger.py` around lines 36 - 45, The
get_task_logger function currently never sets a logger level and only sets
propagate=False inside the "if not logger.handlers" guard, causing INFO messages
to be dropped and duplicate output in some setups; fix by explicitly setting the
logger level (e.g. logger.setLevel(logging.INFO) or
logger.setLevel(logging.DEBUG) as appropriate) so records pass level filtering,
move logger.propagate = False outside the handlers-creation guard so it is
always applied, and optionally ensure the StreamHandler has an appropriate level
(handler.setLevel or leave NOTSET) so the handler receives the records; these
changes should be made in get_task_logger.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs Documentation module:ai AI/ML modules module:nodes Python pipeline nodes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants