-
Notifications
You must be signed in to change notification settings - Fork 148
feat(ai): add TaskTracker for task lifecycle state observability #746
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 45 commits
f4f7aa5
b22e844
319fca1
7ce10e3
90fac4d
b0598a2
ca2b870
5862bd8
b33eff4
be13299
4d88f21
48a784f
7e6963d
5952843
c667348
12c00e2
c587ada
bb482f0
8e676e5
50a0bd4
96cf781
92593be
fe1f418
89504f9
dc7f679
4c9d406
c39113d
47ad4f9
3e8e46b
5399865
43c725d
5602c71
9eaba7e
efa13f6
66e54c3
cc8840e
1f33c2f
4d29752
9ad5458
282a102
718b5ce
46d6428
83abe32
3d91916
73a4331
009373c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 RocketRide Contributors | ||
| # ============================================================================= | ||
|
|
||
| # ------------------------------------------------------------------------------ | ||
| # This class controls the data shared between all threads for the task | ||
| # ------------------------------------------------------------------------------ | ||
| import os | ||
|
|
||
| from rocketlib import IGlobalBase, OPEN_MODE, warning | ||
| from ai.common.config import Config | ||
|
|
||
|
|
||
| class IGlobal(IGlobalBase): | ||
| """Global state for the ml_sklearn node — holds the loaded sklearn model.""" | ||
|
|
||
| preprocessor: object = None # The sklearn model/pipeline instance | ||
|
|
||
| def validateConfig(self): | ||
| """Validate that scikit-learn and numpy are available.""" | ||
| try: | ||
| from depends import depends | ||
|
|
||
| requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' | ||
| depends(requirements) | ||
| except Exception as e: # noqa: BLE001 | ||
| warning(str(e)) | ||
|
|
||
| def beginGlobal(self): | ||
| """Load the sklearn model at runtime startup.""" | ||
| if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: | ||
| # Config mode: don't load the model, we'll only be called | ||
| # to configure the service definition. | ||
| pass | ||
| else: | ||
| from depends import depends | ||
|
|
||
| requirements = os.path.dirname(os.path.realpath(__file__)) + '/requirements.txt' | ||
| depends(requirements) | ||
|
|
||
| # Deferred import — only after deps are installed | ||
| from .code import PreProcessor | ||
|
|
||
| config = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig) | ||
| self.preprocessor = PreProcessor(config) | ||
|
|
||
| def endGlobal(self): | ||
| """Release the sklearn model.""" | ||
| self.preprocessor = None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| # ============================================================================= | ||
| # MIT License | ||
| # Copyright (c) 2026 RocketRide Contributors | ||
| # ============================================================================= | ||
|
|
||
| # ------------------------------------------------------------------------------ | ||
| # This class controls the data for each thread of the task | ||
| # ------------------------------------------------------------------------------ | ||
| import copy | ||
|
|
||
| from rocketlib import IInstanceBase, Entry | ||
|
|
||
| from .IGlobal import IGlobal | ||
|
|
||
|
|
||
| class IInstance(IInstanceBase): | ||
| """Per-thread instance for the ml_sklearn node.""" | ||
|
|
||
| IGlobal: IGlobal | ||
|
|
||
| def open(self, obj: Entry): | ||
| """Called before each new pipeline object — nothing to reset for this node.""" | ||
| pass | ||
|
|
||
| def writeAnswers(self, question): | ||
| """ | ||
| Receive a question from upstream, run sklearn inference on its text, | ||
| and forward the result to the answers output lane. | ||
|
|
||
| The question is deep-copied to prevent mutation in fan-out pipelines. | ||
| """ | ||
| if self.IGlobal.preprocessor is None: | ||
| raise RuntimeError('sklearn PreProcessor not initialized') | ||
|
|
||
| question = copy.deepcopy(question) | ||
|
|
||
| # Get the text to process | ||
| text = question.text if hasattr(question, 'text') else str(question) | ||
|
|
||
| # Run inference | ||
| result = self.IGlobal.preprocessor.process(text) | ||
|
|
||
| # Write result back to the question object and forward downstream | ||
| question.text = result | ||
| self.instance.writeAnswers(question) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| # ML Sklearn Prediction Node | ||
|
|
||
| This node performs predictions using a trained scikit-learn model. | ||
|
|
||
| ## Input | ||
|
|
||
| - text (number as string) | ||
|
|
||
| ## Output | ||
|
|
||
| - text (predicted value as string) | ||
|
|
||
| ## Example | ||
|
|
||
| Input: | ||
| 250 | ||
|
|
||
| Output: | ||
| 3.5 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| from .IGlobal import IGlobal | ||
| from .IInstance import IInstance | ||
|
|
||
| __all__ = ['IGlobal', 'IInstance'] |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,47 @@ | ||||||
| # ============================================================================= | ||||||
| # MIT License | ||||||
| # Copyright (c) 2026 RocketRide Contributors | ||||||
| # ============================================================================= | ||||||
|
|
||||||
| # ------------------------------------------------------------------------------ | ||||||
| # PreProcessor: sklearn-based text inference class | ||||||
| # All heavy imports are deferred — this file is imported only after | ||||||
| # depends() has installed requirements.txt in beginGlobal(). | ||||||
| # ------------------------------------------------------------------------------ | ||||||
|
|
||||||
|
|
||||||
| class PreProcessor: | ||||||
| """Wraps a scikit-learn model/pipeline for text inference.""" | ||||||
|
|
||||||
| def __init__(self, config: dict): | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Two Ruff linting failures on
🔧 Proposed fix- def __init__(self, config: dict):
+ def __init__(self, _config: dict) -> None:📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.15.12)[warning] 16-16: Missing return type annotation for special method Add return type annotation: (ANN204) [warning] 16-16: Unused method argument: (ARG002) 🤖 Prompt for AI Agents |
||||||
| """ | ||||||
| Initialize the sklearn model. | ||||||
|
|
||||||
| In a real deployment, you'd load a pickled model from a path | ||||||
| specified in config. This stub returns text unchanged so the | ||||||
| node is CI-safe without a pre-trained model artifact. | ||||||
| """ | ||||||
| # Example: load a real model like this: | ||||||
| # import joblib | ||||||
| # model_path = config.get('model_path', '') | ||||||
| # self._model = joblib.load(model_path) | ||||||
| self._model = None # Replace with actual model loading | ||||||
|
|
||||||
| def process(self, text: str) -> str: | ||||||
| """ | ||||||
| Run sklearn inference on input text and return processed text. | ||||||
|
|
||||||
| Args: | ||||||
| text: The input string to process. | ||||||
|
|
||||||
| Returns: | ||||||
| The processed string. Currently passes through unchanged. | ||||||
| """ | ||||||
| if self._model is None: | ||||||
| # Pass-through when no model is loaded (safe for CI) | ||||||
| return text | ||||||
|
|
||||||
| # Example with a real model: | ||||||
| # prediction = self._model.predict([text]) | ||||||
| # return str(prediction[0]) | ||||||
| return text | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| scikit-learn>=1.0.0,<2.0.0 | ||
| numpy>=1.21.0,<3.0.0 |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,52 @@ | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| "ml_sklearn": { | ||||||||||||||||||||||||||||||||||||||
| "name": "ML sklearn", | ||||||||||||||||||||||||||||||||||||||
| "description": "Applies a trained scikit-learn model to process text through the pipeline.", | ||||||||||||||||||||||||||||||||||||||
| "icon": "python.svg", | ||||||||||||||||||||||||||||||||||||||
| "group": "preprocessor", | ||||||||||||||||||||||||||||||||||||||
| "color": "#f97316", | ||||||||||||||||||||||||||||||||||||||
| "runtime": "python", | ||||||||||||||||||||||||||||||||||||||
| "pipe": { | ||||||||||||||||||||||||||||||||||||||
| "lanes": { | ||||||||||||||||||||||||||||||||||||||
| "answers": { | ||||||||||||||||||||||||||||||||||||||
| "in": true, | ||||||||||||||||||||||||||||||||||||||
| "out": true | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+9
to
+16
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Verify: do any other "preprocessor" group nodes use the pipe.lanes structure?
# Expected: none — all should use top-level "lanes" + "input"
rg -l '"group":\s*"preprocessor"' --glob '*/services.json' | xargs -I{} sh -c 'echo "=== {} ==="; cat "{}"' | grep -A10 '"pipe"\|"lanes"'Repository: rocketride-org/rocketride-server Length of output: 225 🏁 Script executed: # Find all services.json files in the repository
find . -name "services.json" -type f | head -20Repository: rocketride-org/rocketride-server Length of output: 978 🏁 Script executed: # Read the ml_sklearn/services.json file to see the full context
cat -n ./nodes/src/nodes/ml_sklearn/services.jsonRepository: rocketride-org/rocketride-server Length of output: 1695 🏁 Script executed: # Find all preprocessor group nodes and extract their pipe/lanes structures
rg '"group":\s*"preprocessor"' -l ./nodes/src/nodes/*/services.json | while read file; do
echo "=== $file ==="
# Extract the pipe or lanes section (simplified for readability)
cat "$file" | jq -r 'to_entries[] | select(.value.group == "preprocessor") | .value | {pipe, lanes, input}' 2>/dev/null || echo "Error parsing"
doneRepository: rocketride-org/rocketride-server Length of output: 269 🏁 Script executed: # Broader search: find all preprocessor group nodes
find ./nodes/src/nodes/*/services.json -type f | xargs grep -l '"group":\s*"preprocessor"'Repository: rocketride-org/rocketride-server Length of output: 119 🏁 Script executed: # Find and examine agent_langchain and response node structures
echo "=== agent_langchain ===" && cat ./nodes/src/nodes/agent_langchain/services.json | jq '.' | head -50
echo ""
echo "=== response ===" && cat ./nodes/src/nodes/response/services.json | jq '.' 2>/dev/null | head -50Repository: rocketride-org/rocketride-server Length of output: 1122 🏁 Script executed: # Get the response node structure (if it exists)
find ./nodes/src/nodes -name "response" -type d | head -1Repository: rocketride-org/rocketride-server Length of output: 103 🏁 Script executed: # Read the response node services.json
cat -n ./nodes/src/nodes/response/services.jsonRepository: rocketride-org/rocketride-server Length of output: 5344 🏁 Script executed: # Check if any other node uses "pipe" wrapper
grep -r '"pipe"' ./nodes/src/nodes/*/services.json | head -20Repository: rocketride-org/rocketride-server Length of output: 137 🏁 Script executed: # Verify the {"in": true, "out": true} format doesn't appear elsewhere
grep -r '"in":\s*true' ./nodes/src/nodes/*/services.jsonRepository: rocketride-org/rocketride-server Length of output: 151 Fix non-standard lane configuration structure in The Use the standard structure: 🔧 Proposed fix- "pipe": {
- "lanes": {
- "answers": {
- "in": true,
- "out": true
- }
- }
- },
+ "lanes": {
+ "answers": []
+ },
+ "input": [
+ {
+ "lane": "answers",
+ "output": []
+ }
+ ],📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||
| "preconfig": { | ||||||||||||||||||||||||||||||||||||||
| "default": { | ||||||||||||||||||||||||||||||||||||||
| "object": "default", | ||||||||||||||||||||||||||||||||||||||
| "properties": [] | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
| "profiles": { | ||||||||||||||||||||||||||||||||||||||
| "ml_sklearn.default": { | ||||||||||||||||||||||||||||||||||||||
| "object": "default", | ||||||||||||||||||||||||||||||||||||||
| "properties": [] | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+17
to
+28
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Currently, 🐛 Proposed fix — restructure `preconfig` and remove misplaced `profiles`- "preconfig": {
- "default": {
- "object": "default",
- "properties": []
- }
- },
- "profiles": {
- "ml_sklearn.default": {
- "object": "default",
- "properties": []
- }
- },
+ "preconfig": {
+ "default": "default",
+ "profiles": {
+ "default": {}
+ }
+ },📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||
| "fields": [], | ||||||||||||||||||||||||||||||||||||||
| "shape": { | ||||||||||||||||||||||||||||||||||||||
| "inputs": [ | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| "name": "answers", | ||||||||||||||||||||||||||||||||||||||
| "type": "answers" | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| ], | ||||||||||||||||||||||||||||||||||||||
| "outputs": [ | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| "name": "answers", | ||||||||||||||||||||||||||||||||||||||
| "type": "answers" | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+30
to
+43
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In this codebase, Since 🔧 Proposed fix- "shape": {
- "inputs": [
- {
- "name": "answers",
- "type": "answers"
- }
- ],
- "outputs": [
- {
- "name": "answers",
- "type": "answers"
- }
- ]
- },
+ "shape": [],📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||
| "test": { | ||||||||||||||||||||||||||||||||||||||
| "answers": [ | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| "text": "hello world" | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,11 @@ | |
| from .types import LAUNCH_TYPE | ||
| from .task_conn import TaskConn | ||
| from .task_metrics import TaskMetrics | ||
| from .task_logger import get_task_logger | ||
| from .task_tracker import tracker | ||
|
|
||
| # Module-level structured logger for task lifecycle events | ||
| _logger = get_task_logger(__name__) | ||
|
|
||
|
|
||
| if TYPE_CHECKING: | ||
|
|
@@ -843,6 +848,19 @@ async def _terminated(self) -> None: | |
| apikey=task_apikey, | ||
| ) | ||
|
|
||
| _logger.info( | ||
| 'Task terminated', | ||
| extra={ | ||
| 'task_id': self.id, | ||
| 'step': 'termination', | ||
| 'exit_code': self._status.exitCode, | ||
| 'final_state': self._status.state, | ||
| }, | ||
| ) | ||
| if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0): | ||
| tracker.on_failed(self.id) | ||
| else: | ||
| tracker.on_completed(self.id) | ||
|
Comment on lines
+860
to
+863
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The condition on line 860: if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):treats The explicit The fix is to default to 🐛 Proposed fix- if self._stop_requested or (self._status.exitCode and self._status.exitCode != 0):
- tracker.on_failed(self.id)
- else:
- tracker.on_completed(self.id)
+ if self._status.exitCode == 0 and not self._stop_requested:
+ tracker.on_completed(self.id)
+ else:
+ tracker.on_failed(self.id)With this change, 🤖 Prompt for AI Agents |
||
| self.debug_message('Resource cleanup completed successfully') | ||
|
|
||
| def _on_metrics_updated(self) -> None: | ||
|
|
@@ -1470,6 +1488,11 @@ async def start_task(self) -> None: | |
|
|
||
| # Set our current state | ||
| self._status.state = TASK_STATE.STARTING.value | ||
| _logger.info( | ||
| 'Task starting', | ||
| extra={'task_id': self.id, 'step': 'start'}, | ||
| ) | ||
| tracker.on_starting(self.id) | ||
|
|
||
| # Resolve any ${...} in the pipeline | ||
| self._pipeline = self._resolve_pipeline(self._pipeline) | ||
|
|
@@ -1586,6 +1609,16 @@ async def start_task(self) -> None: | |
| env=subprocess_env, | ||
| ) | ||
|
|
||
| _logger.info( | ||
| 'Subprocess created', | ||
| extra={ | ||
| 'task_id': self.id, | ||
| 'step': 'subprocess', | ||
| 'pid': self._engine_process.pid, | ||
| }, | ||
| ) | ||
| tracker.on_running(self.id) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In 🐛 Proposed fix — swap the two tracker calls- tracker.on_running(self.id)
+ tracker.on_initializing(self.id)- tracker.on_initializing(self.id)
+ tracker.on_running(self.id)Also applies to: 1651-1651 🤖 Prompt for AI Agents |
||
|
|
||
| # Initialize stdio interface | ||
| try: | ||
| self._debug_stdio = Task.TaskDbgStdio( | ||
|
|
@@ -1615,6 +1648,7 @@ async def start_task(self) -> None: | |
|
|
||
| # Setup to initializing | ||
| self._status.state = TASK_STATE.INITIALIZING.value | ||
| tracker.on_initializing(self.id) | ||
|
|
||
| # Create the periodic status update task | ||
| self._status_update_task = asyncio.create_task(self._status_update_loop()) | ||
|
|
@@ -1661,6 +1695,12 @@ async def start_task(self) -> None: | |
|
|
||
| except Exception as e: | ||
| await self._terminated() | ||
| tracker.on_failed(self.id) | ||
| _logger.error( | ||
| 'Task startup failed', | ||
| extra={'task_id': self.id, 'step': 'error', 'error': str(e)}, | ||
| exc_info=True, | ||
| ) | ||
|
Comment on lines
+1699
to
+1703
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use Ruff G201 flags this. ♻️ Proposed fix- _logger.error(
- 'Task startup failed',
- extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
- exc_info=True,
- )
+ _logger.exception(
+ 'Task startup failed',
+ extra={'task_id': self.id, 'step': 'error', 'error': str(e)},
+ )🧰 Tools🪛 Ruff (0.15.12)[warning] 1699-1699: Logging (G201) 🤖 Prompt for AI Agents |
||
| self.debug_message(f'Task startup failed: {e}') | ||
| raise | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Input description is inconsistent with the
testfixture inservices.json.The README states the input is a "number as string" (e.g.,
250), butservices.jsonuses"hello world"as the test payload. Either update the test fixture to a numeric string (e.g.,"250") or broaden the description to "text (string)" if arbitrary strings are valid.🤖 Prompt for AI Agents