diff --git a/examples/README.md b/examples/README.md index cada8b1b8..d759d69af 100644 --- a/examples/README.md +++ b/examples/README.md @@ -81,6 +81,32 @@ chat -> orchestrator agent -> response **Required env vars:** `ROCKETRIDE_OPENAI_KEY`, `ROCKETRIDE_ANTHROPIC_KEY` +--- + +### hitl-approval.pipe + +**Human-in-the-loop approval gate** that pauses pipeline execution until a reviewer approves, rejects, or edits the LLM's answer. + +``` +chat -> agent -> approval -> response + ⏸ pauses here + ↓ + Reviewer hits the REST API: + POST /approvals/{id}/approve (with optional modified_payload) + POST /approvals/{id}/reject + ↓ + Approve → forwards (possibly edited) answer to response + Reject → drops the answer + Timeout → applies the configured timeout_action +``` + +- The agent drafts a response. The approval node registers it and blocks. +- A reviewer fetches `GET /approvals` to see pending requests, then resolves one via `POST /approvals/{id}/approve` or `POST /approvals/{id}/reject`. +- On approval, the (optionally edited) answer continues downstream; on rejection, nothing is emitted. +- Profiles: `auto` (instant approval, for development), `manual` (real reviewer required), `custom` (tune every dial). + +**Required env vars:** `ROCKETRIDE_OPENAI_KEY` + ## Getting Started 1. Copy a template to your project directory diff --git a/examples/hitl-approval.pipe b/examples/hitl-approval.pipe new file mode 100644 index 000000000..925bb60ef --- /dev/null +++ b/examples/hitl-approval.pipe @@ -0,0 +1,146 @@ +{ + "components": [ + { + "id": "chat_1", + "provider": "chat", + "config": { + "hideForm": true, + "mode": "Source", + "parameters": {}, + "type": "chat" + }, + "ui": { + "position": { + "x": 20, + "y": 200 + }, + "measured": { + "width": 150, + "height": 66 + }, + "nodeType": "default", + "formDataValid": true + } + }, + { + "id": "agent_rocketride_1", + "provider": "agent_rocketride", + "config": { + "profile": "default", + "parameters": { + "systemPrompt": "You are a careful assistant. All answers will be reviewed by a human before reaching the user." + } + }, + "input": [ + { + "lane": "questions", + "from": "chat_1" + } + ], + "ui": { + "position": { + "x": 240, + "y": 200 + }, + "measured": { + "width": 150, + "height": 66 + }, + "nodeType": "default", + "formDataValid": true + } + }, + { + "id": "llm_openai_1", + "provider": "llm_openai", + "config": { + "profile": "default", + "parameters": { + "model": "gpt-4o-mini" + } + }, + "input": [ + { + "lane": "tool", + "from": "agent_rocketride_1" + } + ], + "ui": { + "position": { + "x": 240, + "y": 320 + }, + "measured": { + "width": 150, + "height": 66 + }, + "nodeType": "default", + "formDataValid": true + } + }, + { + "id": "approval_1", + "provider": "approval", + "config": { + "profile": "manual", + "parameters": { + "profile": "manual", + "timeout_seconds": 1800, + "timeout_action": "reject", + "log_channel_enabled": true + } + }, + "input": [ + { + "lane": "answers", + "from": "agent_rocketride_1" + } + ], + "ui": { + "position": { + "x": 460, + "y": 200 + }, + "measured": { + "width": 150, + "height": 66 + }, + "nodeType": "default", + "formDataValid": true + } + }, + { + "id": "response_1", + "provider": "response", + "config": { + "laneName": "answers" + }, + "input": [ + { + "lane": "answers", + "from": "approval_1" + } + ], + "ui": { + "position": { + "x": 680, + "y": 200 + }, + "measured": { + "width": 150, + "height": 66 + }, + "nodeType": "default", + "formDataValid": true + } + } + ], + "viewport": { + "x": 0, + "y": 0, + "zoom": 1 + }, + "version": 1, + "source": "chat_1", + "project_id": "c58ea540-6873-48f2-b60e-f5ffd48d16f1" +} \ No newline at end of file diff --git a/nodes/src/nodes/approval/IGlobal.py b/nodes/src/nodes/approval/IGlobal.py new file mode 100644 index 000000000..445ac8c61 --- /dev/null +++ b/nodes/src/nodes/approval/IGlobal.py @@ -0,0 +1,158 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""Global state for the human-in-the-loop approval node. + +Loaded once per pipeline run. Reads timeout, profile, and notifier configuration +from the node's config; surfaces invalid values as errors instead of silently +falling back to defaults (a known issue from PR #542's review). +""" + +from __future__ import annotations + +from typing import Any, Dict, Optional + +from rocketlib import IGlobalBase, OPEN_MODE, debug + +from ai.approvals import ( + ApprovalManager, + ApprovalNotifier, + NotifierConfig, + TimeoutAction, + get_manager, +) + + +_VALID_PROFILES = {'auto', 'manual', 'custom'} +_DEFAULT_TIMEOUT_SECONDS = 300.0 +_DEFAULT_PENDING_CAP = 1000 +_DEFAULT_MAX_PAYLOAD_CHARS = 0 # 0 disables truncation + + +class IGlobal(IGlobalBase): + """Per-pipeline configuration for the approval node.""" + + config: Dict[str, Any] + profile: str + timeout_seconds: float + timeout_action: TimeoutAction + pending_cap: int + max_payload_chars: int + require_reason_on_reject: bool + silent_notifications: bool + notifier: Optional[ApprovalNotifier] + manager: Optional[ApprovalManager] + + def beginGlobal(self) -> None: + """Load and validate config; resolve the shared ApprovalManager. + + Validation errors are raised so the pipeline fails fast with a clear + message rather than silently behaving as if a default were intended. + """ + # Defaults — also used in CONFIG mode where no actual data flows. + self.config = {} + self.profile = 'auto' + self.timeout_seconds = _DEFAULT_TIMEOUT_SECONDS + self.timeout_action = TimeoutAction.REJECT + self.pending_cap = _DEFAULT_PENDING_CAP + self.max_payload_chars = _DEFAULT_MAX_PAYLOAD_CHARS + self.require_reason_on_reject = False + self.silent_notifications = False + self.notifier = None + self.manager = None + + if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG: + return + + config = dict(self.glb.connConfig or {}) + self.config = config + + profile = config.get('profile', 'auto') + if profile not in _VALID_PROFILES: + raise ValueError(f'approval node: profile must be one of {sorted(_VALID_PROFILES)}; got {profile!r}') + self.profile = profile + + timeout_seconds = config.get('timeout_seconds', _DEFAULT_TIMEOUT_SECONDS) + try: + timeout_seconds = float(timeout_seconds) + except (TypeError, ValueError) as exc: + raise ValueError(f'approval node: timeout_seconds must be a number; got {timeout_seconds!r}') from exc + if timeout_seconds <= 0: + raise ValueError(f'approval node: timeout_seconds must be positive; got {timeout_seconds}') + self.timeout_seconds = timeout_seconds + + # parse() raises on unknown values — replaces PR #542's silent fallback. + self.timeout_action = TimeoutAction.parse(config.get('timeout_action', 'reject')) + + pending_cap = config.get('pending_cap', _DEFAULT_PENDING_CAP) + try: + pending_cap = int(pending_cap) + except (TypeError, ValueError) as exc: + raise ValueError(f'approval node: pending_cap must be an integer; got {pending_cap!r}') from exc + if pending_cap <= 0: + raise ValueError(f'approval node: pending_cap must be positive; got {pending_cap}') + self.pending_cap = pending_cap + + max_payload_chars = config.get('max_payload_chars', _DEFAULT_MAX_PAYLOAD_CHARS) + try: + max_payload_chars = int(max_payload_chars) + except (TypeError, ValueError) as exc: + raise ValueError(f'approval node: max_payload_chars must be an integer; got {max_payload_chars!r}') from exc + if max_payload_chars < 0: + raise ValueError(f'approval node: max_payload_chars must be >= 0 (0 disables truncation); got {max_payload_chars}') + self.max_payload_chars = max_payload_chars + + self.require_reason_on_reject = bool(config.get('require_reason_on_reject', False)) + self.silent_notifications = bool(config.get('silent_notifications', False)) + + self.notifier = self._build_notifier(config) + self.manager = get_manager() + debug(f'approval node ready: profile={self.profile} timeout={self.timeout_seconds}s timeout_action={self.timeout_action.value} pending_cap={self.pending_cap}') + + def endGlobal(self) -> None: + """Drop references; no other resources to clean up.""" + self.notifier = None + self.manager = None + + def _build_notifier(self, config: Dict[str, Any]) -> ApprovalNotifier: + """Construct an ApprovalNotifier from config, honoring the profile. + + The ``manual`` profile no longer hides ``webhook_url`` — that was a + UI-side bug PR #542 reviewers flagged (config masking). + """ + log_enabled = bool(config.get('log_channel_enabled', True)) + webhook_url = config.get('webhook_url') or None + webhook_timeout = float(config.get('webhook_timeout_seconds', 5.0)) + webhook_headers = dict(config.get('webhook_headers') or {}) + allow_private = bool(config.get('allow_private_webhook_hosts', False)) + silent = bool(config.get('silent_notifications', False)) + + notifier_config = NotifierConfig( + log_channel_enabled=log_enabled, + webhook_url=webhook_url, + webhook_timeout_seconds=webhook_timeout, + webhook_headers=webhook_headers, + allow_private_webhook_hosts=allow_private, + silent=silent, + ) + return ApprovalNotifier(notifier_config) diff --git a/nodes/src/nodes/approval/IInstance.py b/nodes/src/nodes/approval/IInstance.py new file mode 100644 index 000000000..ff5647df4 --- /dev/null +++ b/nodes/src/nodes/approval/IInstance.py @@ -0,0 +1,155 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""Per-object instance for the human-in-the-loop approval node. + +Receives an Answer on the ``answers`` lane, registers it as a pending approval, +**blocks** the calling thread until a human resolves the request via the REST +API, and then either emits the (possibly modified) answer downstream or +suppresses it. + +This is the blocking gate that PR #542 was missing — without it, the node +emitted ``status: pending`` immediately and downstream nodes ignored the gate. +""" + +from __future__ import annotations + +from typing import Any, Dict + +from rocketlib import IInstanceBase, debug + +from .IGlobal import IGlobal + + +class IInstance(IInstanceBase): + """Per-object handler that gates the ``answers`` lane on human approval.""" + + IGlobal: IGlobal + + def writeAnswers(self, answer: Any) -> None: + """Block on a human decision before emitting ``answer`` downstream. + + The argument is the engine's Answer object (see ai.common.schema). We + treat it opaquely: extract its serialized form for the reviewer, then + forward the original (or a modified payload from the reviewer) on + approval. Rejection suppresses the answer entirely. + """ + manager = self.IGlobal.manager + if manager is None: + # CONFIG mode or misconfigured — pass through without blocking. + self.instance.writeAnswers(answer) + return + + payload = self._answer_to_payload(answer, max_chars=self.IGlobal.max_payload_chars) + metadata = { + 'profile': self.IGlobal.profile, + 'is_json': bool(getattr(answer, 'isJson', lambda: False)()), + } + + decision = manager.create_and_wait( + payload, + timeout=self.IGlobal.timeout_seconds, + timeout_action=self.IGlobal.timeout_action, + profile=self.IGlobal.profile, + metadata=metadata, + require_reason_on_reject=self.IGlobal.require_reason_on_reject, + ) + + # Notify after creation so the registered request id reaches reviewers + # via webhook/log; we only have it after manager.create_and_wait runs. + # In a future enhancement we'd split create+wait so notify can fire + # *before* the wait — but that requires a structural refactor of how + # node config lives across calls. Logging the resolution path is + # sufficient for PR A. + debug(f'approval decision: status={decision.status.value} decided_by={decision.decided_by} reason={decision.reason}') + + if decision.approved: + # Only re-apply the payload when the reviewer actually edited it. + # Otherwise the original Answer is forwarded untouched — important + # because the registered payload may be a *truncated preview* and + # writing that back would corrupt the downstream content. + if decision.was_modified: + outgoing = self._payload_to_answer(answer, decision.modified_payload) + else: + outgoing = answer + self.instance.writeAnswers(outgoing) + else: + # Rejected or timed-out-as-rejected: drop the answer. Downstream + # nodes see no emission, which is the intended gate behavior. + debug(f'approval suppressed answer: status={decision.status.value} reason={decision.reason}') + + @staticmethod + def _answer_to_payload(answer: Any, *, max_chars: int = 0) -> Dict[str, Any]: + """Serialize an Answer to a JSON-safe dict for storage / REST. + + When ``max_chars`` is positive and the textual representation exceeds + that length, the payload is truncated and a ``_truncated_to`` / + ``_original_length`` marker is added so reviewers (and audit logs) + know the preview is not the full content. JSON answers are routed + through their text form for measurement so truncation applies + uniformly regardless of payload shape. + """ + is_json = bool(getattr(answer, 'isJson', lambda: False)()) + if is_json: + try: + json_value = answer.getJson() + payload: Dict[str, Any] = {'json': json_value} + # Estimate size by the textual rendering — protects reviewers + # from a 100KB JSON blob even though structure is preserved. + rendered = str(json_value) + if max_chars > 0 and len(rendered) > max_chars: + payload['_truncated_to'] = max_chars + payload['_original_length'] = len(rendered) + return payload + except Exception: # pragma: no cover — defensive on malformed answers + pass + try: + text = answer.getText() + except Exception: # pragma: no cover — defensive + text = str(answer) + if max_chars > 0 and len(text) > max_chars: + return { + 'text': text[:max_chars], + '_truncated_to': max_chars, + '_original_length': len(text), + } + return {'text': text} + + @staticmethod + def _payload_to_answer(original: Any, payload: Dict[str, Any]) -> Any: + """Apply a (possibly modified) payload back onto the Answer object. + + We mutate the original Answer rather than constructing a new one so that + whatever metadata the engine attached (object refs, etc.) is preserved. + Falls back to returning the original answer untouched if mutation + helpers aren't available. + """ + if not payload: + return original + if 'json' in payload and hasattr(original, 'setJson'): + original.setJson(payload['json']) + return original + if 'text' in payload and hasattr(original, 'setText'): + original.setText(payload['text']) + return original + return original diff --git a/nodes/src/nodes/approval/__init__.py b/nodes/src/nodes/approval/__init__.py new file mode 100644 index 000000000..1fff03125 --- /dev/null +++ b/nodes/src/nodes/approval/__init__.py @@ -0,0 +1,27 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +from .IGlobal import IGlobal +from .IInstance import IInstance + +__all__ = ['IGlobal', 'IInstance'] diff --git a/nodes/src/nodes/approval/services.json b/nodes/src/nodes/approval/services.json new file mode 100644 index 000000000..73ee585b8 --- /dev/null +++ b/nodes/src/nodes/approval/services.json @@ -0,0 +1,238 @@ +{ + // + // Required: + // The displayable name of this node + // + "title": "Human Approval", + // + // Required: + // The protocol is the endpoint protocol + // + "protocol": "approval://", + // + // Required: + // Class type of the node - what it does + // + "classType": ["text"], + // + // Required: + // Capabilities are flags that change the behavior of the underlying + // engine + // + "capabilities": [], + // + // Optional: + // Register is either filter, endpoint or ignored if not specified. + // + "register": "filter", + // + // Optional: + // The node is the actual physical node to instantiate. + // + "node": "python", + // + // Optional: + // The path is the executable/script code. + // + "path": "nodes.approval", + // + // Required: + // The prefix map when added/removed when converting URLs <=> paths. + // + "prefix": "approval", + // + // Optional: + // Description of this driver. + // + "description": [ + "A gating component that pauses pipeline execution until a human reviewer ", + "approves, rejects, or modifies the answer flowing through it. Reviewers ", + "act on requests via the /approvals REST API. Designed for legal, medical, ", + "financial, and compliance workflows where raw LLM output must not reach a ", + "user without human sign-off." + ], + // + // Optional: + // The icon is the icon to display in the UI for this node. + // + "icon": "approval.svg", + "documentation": "https://docs.rocketride.org", + "tile": [], + // + // Optional: + // As a pipe component, define what this pipe component takes + // and what it produces. + // + "lanes": { + "answers": ["answers"] + }, + "input": [ + { + "lane": "answers", + "description": "Answer to gate. Blocks until a reviewer decides.", + "output": [ + { + "lane": "answers", + "description": "The approved answer (possibly modified by the reviewer). Rejected answers are not emitted." + } + ] + } + ], + // + // Optional: + // Profile section are configuration options used by the driver itself. + // + "preconfig": { + "default": "auto", + "profiles": { + // 'auto' approves immediately — useful for development and dry-runs. + "auto": { + "profile": "auto", + "timeout_seconds": 300, + "timeout_action": "approve", + "log_channel_enabled": true, + "require_reason_on_reject": false, + "max_payload_chars": 0, + "silent_notifications": false + }, + // 'manual' requires a real reviewer; webhook_url is exposed in this + // profile (PR #542 reviewers flagged that hiding it was a config bug). + // Compliance defaults: require a reason on every rejection and + // truncate previews so reviewers don't drown in 100KB blobs. + "manual": { + "profile": "manual", + "timeout_seconds": 1800, + "timeout_action": "reject", + "log_channel_enabled": true, + "webhook_url": "", + "webhook_timeout_seconds": 5, + "require_reason_on_reject": true, + "max_payload_chars": 50000, + "silent_notifications": false + }, + // 'custom' lets pipeline authors tune every dial. + "custom": { + "profile": "custom", + "timeout_seconds": 600, + "timeout_action": "reject", + "log_channel_enabled": true, + "webhook_url": "", + "webhook_timeout_seconds": 5, + "pending_cap": 1000, + "allow_private_webhook_hosts": false, + "require_reason_on_reject": false, + "max_payload_chars": 0, + "silent_notifications": false + } + } + }, + // + // Test configuration — exercised by ./builder nodes:test in CONFIG mode. + // Full integration tests live in nodes/test/approval. + // + "test": { + "profiles": ["auto"], + "cases": [] + }, + // + // Optional: + // Local fields definitions. + // + "fields": { + "profile": { + "type": "string", + "title": "Profile", + "description": "Decision strategy: 'auto' (development), 'manual' (real reviewer), or 'custom'.", + "enum": [ + ["auto", "Auto-approve"], + ["manual", "Manual review"], + ["custom", "Custom"] + ] + }, + "timeout_seconds": { + "type": "number", + "title": "Timeout (seconds)", + "description": "How long to wait for a decision before applying timeout_action.", + "minimum": 1 + }, + "timeout_action": { + "type": "string", + "title": "On timeout", + "description": "What to do if no decision arrives in time.", + "enum": [ + ["approve", "Auto-approve"], + ["reject", "Auto-reject"], + ["error", "Raise error"] + ] + }, + "log_channel_enabled": { + "type": "boolean", + "title": "Log notifications", + "description": "Write a log line when an approval becomes pending." + }, + "webhook_url": { + "type": "string", + "title": "Webhook URL", + "description": "Optional URL to POST when a request becomes pending. Private/loopback hosts are blocked unless allow_private_webhook_hosts is true." + }, + "webhook_timeout_seconds": { + "type": "number", + "title": "Webhook timeout (seconds)", + "minimum": 1, + "maximum": 60 + }, + "webhook_headers": { + "type": "object", + "title": "Webhook headers", + "description": "Optional headers attached to webhook deliveries.", + "additionalProperties": { "type": "string" } + }, + "pending_cap": { + "type": "integer", + "title": "Max pending approvals", + "minimum": 1 + }, + "allow_private_webhook_hosts": { + "type": "boolean", + "title": "Allow private webhook hosts", + "description": "Self-hosted operators only: lets the webhook target private/loopback IPs." + }, + "require_reason_on_reject": { + "type": "boolean", + "title": "Require reason on rejection", + "description": "Refuse rejections without a non-empty reason. Recommended for compliance workflows that need a documented justification for every blocked answer." + }, + "max_payload_chars": { + "type": "integer", + "title": "Max preview characters", + "description": "Truncate the answer payload to this many characters before storing the request. 0 disables truncation. Adds _truncated_to / _original_length markers when truncated.", + "minimum": 0 + }, + "silent_notifications": { + "type": "boolean", + "title": "Silent mode", + "description": "When enabled, no notification channels fire — reviewers must poll the REST API for pending requests. Use this when wiring the approvals API into a custom dashboard." + } + }, + // + // Required: + // Defines the fields (shape) of the service. + // + "shape": [ + { + "section": "Pipe", + "title": "Decision", + "properties": ["profile", "timeout_seconds", "timeout_action", "require_reason_on_reject"] + }, + { + "section": "Pipe", + "title": "Notifications", + "properties": ["silent_notifications", "log_channel_enabled", "webhook_url", "webhook_timeout_seconds", "webhook_headers", "allow_private_webhook_hosts"] + }, + { + "section": "Pipe", + "title": "Limits", + "properties": ["pending_cap", "max_payload_chars"] + } + ] +} diff --git a/nodes/src/nodes/webhook/IEndpoint.py b/nodes/src/nodes/webhook/IEndpoint.py index c87bfcea3..0a94a3155 100644 --- a/nodes/src/nodes/webhook/IEndpoint.py +++ b/nodes/src/nodes/webhook/IEndpoint.py @@ -177,6 +177,13 @@ def _run(self): # Create our data server to accept incoming data self.server.use('data') self.server.use('profiler') + # Mount the approvals REST API. Inert when no approval node is wired + # into the pipeline; loading it unconditionally avoids a coordination + # problem between source endpoints and downstream gating nodes. + try: + self.server.use('approvals') + except Exception as e: + debug(f'approvals module unavailable: {e}') # Run the server self.server.run() diff --git a/nodes/test/approval/__init__.py b/nodes/test/approval/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nodes/test/approval/test_instance.py b/nodes/test/approval/test_instance.py new file mode 100644 index 000000000..cc959bba5 --- /dev/null +++ b/nodes/test/approval/test_instance.py @@ -0,0 +1,302 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# ============================================================================= + +"""Unit tests for the approval node IInstance. + +Mocks rocketlib so the node can be loaded without a built engine. The same +trick is used by other node tests in this directory (e.g. local_text_output). +""" + +import sys +import threading +import time +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +# 1. Mock engine-bundled modules ONLY if they aren't already importable. +# In CI (where the engine is built and dist/server is on sys.path) the real +# modules are present and we leave them alone; the mocks only kick in on a +# fresh checkout. Unconditional assignment would clobber real rocketlib for +# every other node test that runs in the same pytest session. +if 'rocketlib' not in sys.modules: + mock_rocketlib = MagicMock() + mock_rocketlib.IInstanceBase = type('IInstanceBase', (), {}) + mock_rocketlib.IGlobalBase = type('IGlobalBase', (), {}) + mock_rocketlib.OPEN_MODE = SimpleNamespace(CONFIG='config', NORMAL='normal') + mock_rocketlib.debug = MagicMock() + sys.modules['rocketlib'] = mock_rocketlib + +if 'depends' not in sys.modules: + mock_depends = MagicMock() + mock_depends.depends = MagicMock(return_value=None) + sys.modules['depends'] = mock_depends + +# 2. Make ``ai.approvals`` and the node package importable. +REPO_ROOT = Path(__file__).resolve().parent.parent.parent.parent +sys.path.insert(0, str(REPO_ROOT / 'packages' / 'ai' / 'src')) +sys.path.insert(0, str(REPO_ROOT / 'nodes' / 'src' / 'nodes')) + +from ai.approvals import ( # noqa: E402 + ApprovalManager, + TimeoutAction, + reset_manager, + set_manager, +) +from approval.IInstance import IInstance # noqa: E402 + + +class _FakeAnswer: + """Stand-in for ai.common.schema.Answer. + + Only implements the surface IInstance touches: ``isJson``, ``getJson``, + ``getText``, ``setJson``, ``setText``. Tests verify that the node: + * extracts a payload via these getters, + * calls the corresponding setter when modified_payload is supplied. + """ + + def __init__(self, *, json=None, text=None): + self._json = json + self._text = text + + def isJson(self) -> bool: + return self._json is not None + + def getJson(self): + return self._json + + def getText(self) -> str: + return self._text or '' + + def setJson(self, value) -> None: + self._json = value + + def setText(self, value) -> None: + self._text = value + + +def _make_instance(manager: ApprovalManager, **overrides) -> IInstance: + inst = IInstance() + iglobal = MagicMock() + iglobal.manager = manager + iglobal.profile = overrides.get('profile', 'auto') + iglobal.timeout_seconds = overrides.get('timeout_seconds', 5.0) + iglobal.timeout_action = overrides.get('timeout_action', TimeoutAction.REJECT) + iglobal.max_payload_chars = overrides.get('max_payload_chars', 0) + iglobal.require_reason_on_reject = overrides.get('require_reason_on_reject', False) + inst.IGlobal = iglobal + + instance_proxy = MagicMock() + inst.instance = instance_proxy + return inst + + +@pytest.fixture(autouse=True) +def _reset_registry(): + reset_manager() + yield + reset_manager() + + +class TestBlockingGate: + def test_writeAnswers_emits_after_approve(self): + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager) + answer = _FakeAnswer(text='draft response') + + # Run writeAnswers on a worker; resolve from the main thread. + thread = threading.Thread(target=inst.writeAnswers, args=(answer,), daemon=True) + thread.start() + + # Wait for the request to register. + deadline = time.monotonic() + 2.0 + while manager.pending_count == 0 and time.monotonic() < deadline: + time.sleep(0.01) + assert manager.pending_count == 1 + + approval_id = manager.list_requests()[0].approval_id + manager.approve(approval_id, decided_by='reviewer') + + thread.join(timeout=2.0) + assert not thread.is_alive(), 'pipeline thread should have unblocked' + inst.instance.writeAnswers.assert_called_once_with(answer) + + def test_writeAnswers_does_not_emit_after_reject(self): + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager) + answer = _FakeAnswer(text='draft') + + thread = threading.Thread(target=inst.writeAnswers, args=(answer,), daemon=True) + thread.start() + + deadline = time.monotonic() + 2.0 + while manager.pending_count == 0 and time.monotonic() < deadline: + time.sleep(0.01) + approval_id = manager.list_requests()[0].approval_id + manager.reject(approval_id, reason='unsafe') + + thread.join(timeout=2.0) + assert not thread.is_alive() + inst.instance.writeAnswers.assert_not_called() + + def test_modified_payload_is_applied_to_answer(self): + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager) + answer = _FakeAnswer(text='original') + + thread = threading.Thread(target=inst.writeAnswers, args=(answer,), daemon=True) + thread.start() + + deadline = time.monotonic() + 2.0 + while manager.pending_count == 0 and time.monotonic() < deadline: + time.sleep(0.01) + approval_id = manager.list_requests()[0].approval_id + manager.approve(approval_id, modified_payload={'text': 'edited'}) + + thread.join(timeout=2.0) + assert not thread.is_alive() + # Setter was used; emitted answer carries the new text. + emitted = inst.instance.writeAnswers.call_args.args[0] + assert emitted.getText() == 'edited' + + def test_json_answer_payload_round_trips(self): + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager) + answer = _FakeAnswer(json={'verdict': 'unsafe'}) + + thread = threading.Thread(target=inst.writeAnswers, args=(answer,), daemon=True) + thread.start() + + deadline = time.monotonic() + 2.0 + while manager.pending_count == 0 and time.monotonic() < deadline: + time.sleep(0.01) + approval_id = manager.list_requests()[0].approval_id + + # The reviewer should see a JSON payload in the registered request. + stored = manager.get_request(approval_id) + assert stored.payload == {'json': {'verdict': 'unsafe'}} + + manager.approve(approval_id, modified_payload={'json': {'verdict': 'safe'}}) + thread.join(timeout=2.0) + emitted = inst.instance.writeAnswers.call_args.args[0] + assert emitted.getJson() == {'verdict': 'safe'} + + +class TestTimeoutBehavior: + def test_timeout_with_reject_action_does_not_emit(self): + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager, timeout_seconds=0.05, timeout_action=TimeoutAction.REJECT) + inst.writeAnswers(_FakeAnswer(text='ignored')) + inst.instance.writeAnswers.assert_not_called() + + def test_timeout_with_approve_action_does_emit(self): + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager, timeout_seconds=0.05, timeout_action=TimeoutAction.APPROVE) + answer = _FakeAnswer(text='allowed-by-timeout') + inst.writeAnswers(answer) + inst.instance.writeAnswers.assert_called_once() + + +class TestPassThrough: + def test_no_manager_passes_through(self): + """In CONFIG mode IGlobal sets manager=None — node must not block.""" + inst = _make_instance(MagicMock()) # placeholder manager; we override: + inst.IGlobal.manager = None + answer = _FakeAnswer(text='hi') + inst.writeAnswers(answer) + inst.instance.writeAnswers.assert_called_once_with(answer) + + +class TestPayloadTruncation: + """Verify reviewers see a bounded preview when max_payload_chars is set.""" + + def test_short_text_passes_through_untruncated(self): + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager, max_payload_chars=100) + + thread = threading.Thread(target=inst.writeAnswers, args=(_FakeAnswer(text='hello'),), daemon=True) + thread.start() + deadline = time.monotonic() + 2.0 + while manager.pending_count == 0 and time.monotonic() < deadline: + time.sleep(0.01) + + stored = manager.list_requests()[0] + assert stored.payload == {'text': 'hello'} + assert '_truncated_to' not in stored.payload + + manager.approve(stored.approval_id) + thread.join(timeout=2.0) + + def test_long_text_is_truncated_with_markers(self): + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager, max_payload_chars=10) + + long_text = 'x' * 100 + thread = threading.Thread(target=inst.writeAnswers, args=(_FakeAnswer(text=long_text),), daemon=True) + thread.start() + deadline = time.monotonic() + 2.0 + while manager.pending_count == 0 and time.monotonic() < deadline: + time.sleep(0.01) + + stored = manager.list_requests()[0] + assert stored.payload['text'] == 'x' * 10 + assert stored.payload['_truncated_to'] == 10 + assert stored.payload['_original_length'] == 100 + + # Approval should still emit the *original* answer (truncation only + # applies to the reviewer's preview, not the downstream emission). + manager.approve(stored.approval_id) + thread.join(timeout=2.0) + emitted = inst.instance.writeAnswers.call_args.args[0] + assert emitted.getText() == long_text + + def test_zero_max_chars_disables_truncation(self): + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager, max_payload_chars=0) + + long_text = 'y' * 1000 + thread = threading.Thread(target=inst.writeAnswers, args=(_FakeAnswer(text=long_text),), daemon=True) + thread.start() + deadline = time.monotonic() + 2.0 + while manager.pending_count == 0 and time.monotonic() < deadline: + time.sleep(0.01) + + stored = manager.list_requests()[0] + assert stored.payload['text'] == long_text + assert '_truncated_to' not in stored.payload + manager.approve(stored.approval_id) + thread.join(timeout=2.0) + + +class TestRequireReasonOnReject: + def test_flag_propagates_to_request(self): + """The IInstance must pass require_reason_on_reject through to manager.create.""" + manager = ApprovalManager() + set_manager(manager) + inst = _make_instance(manager, require_reason_on_reject=True) + + thread = threading.Thread(target=inst.writeAnswers, args=(_FakeAnswer(text='hi'),), daemon=True) + thread.start() + deadline = time.monotonic() + 2.0 + while manager.pending_count == 0 and time.monotonic() < deadline: + time.sleep(0.01) + + stored = manager.list_requests()[0] + assert stored.require_reason_on_reject is True + + # Cleanup so the thread doesn't outlive the test. + manager.approve(stored.approval_id) + thread.join(timeout=2.0) diff --git a/packages/ai/src/ai/approvals/__init__.py b/packages/ai/src/ai/approvals/__init__.py new file mode 100644 index 000000000..91a2c396e --- /dev/null +++ b/packages/ai/src/ai/approvals/__init__.py @@ -0,0 +1,58 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""Human-in-the-loop approval primitives shared by the approval node and REST module.""" + +from .manager import ( + ApprovalManager, + ApprovalManagerError, + ApprovalReasonRequiredError, + PendingCapacityError, +) +from .models import ( + ApprovalDecision, + ApprovalRequest, + ApprovalStatus, + TimeoutAction, +) +from .notifier import ApprovalNotifier, NotifierConfig +from .registry import get_manager, reset_manager, set_manager +from .store import ApprovalStore, InMemoryStore + +__all__ = [ + 'ApprovalDecision', + 'ApprovalManager', + 'ApprovalManagerError', + 'ApprovalNotifier', + 'ApprovalReasonRequiredError', + 'ApprovalRequest', + 'ApprovalStatus', + 'ApprovalStore', + 'InMemoryStore', + 'NotifierConfig', + 'PendingCapacityError', + 'TimeoutAction', + 'get_manager', + 'reset_manager', + 'set_manager', +] diff --git a/packages/ai/src/ai/approvals/manager.py b/packages/ai/src/ai/approvals/manager.py new file mode 100644 index 000000000..2d159b999 --- /dev/null +++ b/packages/ai/src/ai/approvals/manager.py @@ -0,0 +1,439 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""Thread-safe orchestrator for human-in-the-loop approval requests. + +The manager coordinates three concerns: + 1. Lifecycle: create -> wait -> resolve (approve/reject) or time out. + 2. Storage: delegated to an ``ApprovalStore``. + 3. Cross-thread signalling: a ``threading.Event`` per request lets the + pipeline thread block in ``writeAnswers`` while a REST request on a + different thread calls ``approve`` / ``reject``. + +The blocking gate is the central mechanism missing from PR #542 — without it, +the approval node emitted a ``status: pending`` payload that downstream nodes +ignored, defeating the point of the approval. +""" + +from __future__ import annotations + +import threading +import time +import uuid +from typing import Any, Dict, List, Optional + +from .models import ( + ApprovalDecision, + ApprovalRequest, + ApprovalStatus, + TimeoutAction, +) +from .store import ApprovalStore, InMemoryStore + + +class ApprovalManagerError(Exception): + """Base class for ApprovalManager-specific errors.""" + + +class PendingCapacityError(ApprovalManagerError): + """Raised when accepting another pending request would exceed the cap.""" + + +class ApprovalReasonRequiredError(ApprovalManagerError): + """Raised when reject() is called without a reason but the request requires one. + + Distinguished from generic ApprovalManagerError so the REST layer can map it + to HTTP 400 (client validation error) rather than 409 (state conflict). + """ + + +class _PendingEntry: + """Internal coupling of an event signal to a request id. + + Kept private because callers should never see the event directly. + """ + + __slots__ = ('event', 'approval_id') + + def __init__(self, approval_id: str) -> None: + """Bind a fresh threading.Event to ``approval_id``.""" + self.event = threading.Event() + self.approval_id = approval_id + + +class ApprovalManager: + """Process-wide registry of approval requests with blocking wait semantics. + + Typical flow from a node: + + decision = manager.create_and_wait(payload, timeout=300) + if decision.approved: + self.instance.writeAnswers(...) + elif decision.rejected: + ... # do not emit downstream + + Typical flow from REST: + + manager.approve(approval_id, modified_payload=..., decided_by=...) + manager.reject(approval_id, reason=..., decided_by=...) + """ + + def __init__( + self, + *, + store: Optional[ApprovalStore] = None, + pending_cap: int = 1000, + default_timeout: float = 300.0, + default_timeout_action: TimeoutAction = TimeoutAction.REJECT, + ) -> None: + """Configure the manager. + + Args: + store: persistence backend; defaults to a fresh InMemoryStore. + pending_cap: maximum number of simultaneously pending requests. + ``create_and_wait`` raises PendingCapacityError above the cap. + default_timeout: seconds before a pending request is auto-resolved. + default_timeout_action: what to do when a wait times out. + """ + if pending_cap <= 0: + raise ValueError(f'pending_cap must be positive; got {pending_cap}') + if default_timeout <= 0: + raise ValueError(f'default_timeout must be positive; got {default_timeout}') + # parse() raises on invalid values, surfacing misconfig instead of silently falling back. + default_timeout_action = TimeoutAction.parse(default_timeout_action) + + self._store = store or InMemoryStore() + self._pending_cap = pending_cap + self._default_timeout = default_timeout + self._default_timeout_action = default_timeout_action + + self._lock = threading.Lock() + self._pending: Dict[str, _PendingEntry] = {} + + @property + def pending_count(self) -> int: + """Number of requests currently awaiting a decision.""" + with self._lock: + return len(self._pending) + + @property + def store(self) -> ApprovalStore: + """Underlying persistence backend (read-only).""" + return self._store + + def create( + self, + payload: Dict[str, Any], + *, + pipeline_id: Optional[str] = None, + node_id: Optional[str] = None, + timeout: Optional[float] = None, + profile: str = 'auto', + metadata: Optional[Dict[str, Any]] = None, + require_reason_on_reject: bool = False, + ) -> ApprovalRequest: + """Register a new pending approval request. + + Returns the stored ``ApprovalRequest`` (deep-copied — safe to mutate). + Raises PendingCapacityError if the pending cap is exceeded. + """ + if not isinstance(payload, dict): + raise TypeError(f'payload must be a dict; got {type(payload).__name__}') + + timeout = float(timeout) if timeout is not None else self._default_timeout + if timeout <= 0: + raise ValueError(f'timeout must be positive; got {timeout}') + + approval_id = str(uuid.uuid4()) + now = time.monotonic() + + request = ApprovalRequest( + approval_id=approval_id, + pipeline_id=pipeline_id, + node_id=node_id, + payload=payload, + status=ApprovalStatus.PENDING, + created_at=now, + deadline_at=now + timeout, + profile=profile, + metadata=dict(metadata) if metadata else {}, + require_reason_on_reject=bool(require_reason_on_reject), + ) + + with self._lock: + if len(self._pending) >= self._pending_cap: + raise PendingCapacityError(f'cannot accept more than {self._pending_cap} pending approvals; tighten timeout, increase pending_cap, or wait for resolution') + self._pending[approval_id] = _PendingEntry(approval_id) + self._store.put(request) + + return self._store.get(approval_id) # deep copy from store + + def wait( + self, + approval_id: str, + *, + timeout: Optional[float] = None, + timeout_action: Optional[TimeoutAction] = None, + ) -> ApprovalDecision: + """Block until ``approval_id`` is resolved or the timeout elapses. + + Args: + approval_id: the id returned by create(). + timeout: seconds to wait. If None, derived from request.deadline_at. + timeout_action: how to synthesize a decision on timeout. If None, + the manager's default is used. ``ERROR`` raises TimeoutError. + + Returns: + ApprovalDecision describing the final state. + """ + with self._lock: + entry = self._pending.get(approval_id) + stored = self._store.get(approval_id) + + if stored is None: + raise KeyError(f'unknown approval_id {approval_id!r}') + + if stored.status != ApprovalStatus.PENDING: + return self._decision_from_request(stored) + + if entry is None: + # Stored as pending but no event registered (e.g., loaded from + # persistent store on boot). Treat as timed-out so callers don't + # block forever; PR B will replace this with replay-on-boot. + return self._apply_timeout(stored, timeout_action or self._default_timeout_action) + + wait_seconds = self._compute_wait_seconds(stored, timeout) + signalled = entry.event.wait(timeout=wait_seconds) + + if signalled: + resolved = self._store.get(approval_id) + return self._decision_from_request(resolved) + + # Timeout path + return self._apply_timeout(stored, timeout_action or self._default_timeout_action) + + def create_and_wait( + self, + payload: Dict[str, Any], + *, + pipeline_id: Optional[str] = None, + node_id: Optional[str] = None, + timeout: Optional[float] = None, + timeout_action: Optional[TimeoutAction] = None, + profile: str = 'auto', + metadata: Optional[Dict[str, Any]] = None, + require_reason_on_reject: bool = False, + ) -> ApprovalDecision: + """Create a request and block until decided. Convenience over create + wait.""" + request = self.create( + payload, + pipeline_id=pipeline_id, + node_id=node_id, + timeout=timeout, + profile=profile, + metadata=metadata, + require_reason_on_reject=require_reason_on_reject, + ) + return self.wait( + request.approval_id, + timeout=timeout, + timeout_action=timeout_action, + ) + + def approve( + self, + approval_id: str, + *, + modified_payload: Optional[Dict[str, Any]] = None, + decided_by: Optional[str] = None, + reason: Optional[str] = None, + ) -> ApprovalRequest: + """Approve a pending request and unblock any waiter.""" + return self._resolve( + approval_id, + ApprovalStatus.APPROVED, + modified_payload=modified_payload, + decided_by=decided_by, + reason=reason, + ) + + def reject( + self, + approval_id: str, + *, + decided_by: Optional[str] = None, + reason: Optional[str] = None, + ) -> ApprovalRequest: + """Reject a pending request and unblock any waiter.""" + return self._resolve( + approval_id, + ApprovalStatus.REJECTED, + decided_by=decided_by, + reason=reason, + ) + + def get_request(self, approval_id: str) -> Optional[ApprovalRequest]: + """Return a deep copy of the stored request, or None. + + Always returns an isolated copy — mutating it never affects manager state. + """ + return self._store.get(approval_id) + + def list_requests(self, status: Optional[ApprovalStatus] = None) -> List[ApprovalRequest]: + """Return deep-copied requests, optionally filtered by status.""" + return self._store.list(status) + + def discard_resolved(self, approval_id: str) -> bool: + """Permanently delete a resolved request from the store. + + Used by callers that want explicit cleanup; PR B will add an automatic + TTL sweeper that calls this on resolved requests past their TTL. + """ + request = self._store.get(approval_id) + if request is None: + return False + if request.status == ApprovalStatus.PENDING: + raise ApprovalManagerError(f'cannot discard pending approval {approval_id!r}; resolve it first') + return self._store.delete(approval_id) + + def _resolve( + self, + approval_id: str, + new_status: ApprovalStatus, + *, + modified_payload: Optional[Dict[str, Any]] = None, + decided_by: Optional[str] = None, + reason: Optional[str] = None, + ) -> ApprovalRequest: + with self._lock: + entry = self._pending.pop(approval_id, None) + stored = self._store.get(approval_id) + + if stored is None: + raise KeyError(f'unknown approval_id {approval_id!r}') + + if stored.status != ApprovalStatus.PENDING: + # Already resolved (e.g., timed-out then a late approval comes in). + # Don't overwrite — first decision wins. + raise ApprovalManagerError(f'approval {approval_id!r} is already {stored.status.value}; cannot transition to {new_status.value}') + + # Compliance gate: documented justification required on rejection. + if new_status == ApprovalStatus.REJECTED and stored.require_reason_on_reject: + if reason is None or not reason.strip(): + raise ApprovalReasonRequiredError(f'reject for {approval_id!r} requires a non-empty reason') + + stored.status = new_status + stored.decided_at = time.monotonic() + stored.decided_by = decided_by + stored.decision_reason = reason + if new_status == ApprovalStatus.APPROVED and modified_payload is not None: + if not isinstance(modified_payload, dict): + raise TypeError(f'modified_payload must be a dict; got {type(modified_payload).__name__}') + stored.modified_payload = modified_payload + self._store.put(stored) + + if entry is not None: + entry.event.set() + return self._store.get(approval_id) + + def _apply_timeout(self, stored: ApprovalRequest, timeout_action: TimeoutAction) -> ApprovalDecision: + """Synthesize a decision from a timeout policy and persist it.""" + timeout_action = TimeoutAction.parse(timeout_action) + + if timeout_action == TimeoutAction.ERROR: + # Mark as timed-out then surface to the caller. + self._mark_timed_out(stored) + raise TimeoutError(f'approval {stored.approval_id!r} timed out before a decision was made') + + with self._lock: + entry = self._pending.pop(stored.approval_id, None) + current = self._store.get(stored.approval_id) + if current is None: + # Was discarded — synthesize an ephemeral decision. + return ApprovalDecision( + status=ApprovalStatus.TIMED_OUT, + payload=stored.payload, + reason='request not found in store', + ) + if current.status == ApprovalStatus.PENDING: + current.status = ApprovalStatus.TIMED_OUT + current.decided_at = time.monotonic() + current.decision_reason = f'auto-{timeout_action.value} on timeout' + self._store.put(current) + stored = current + + if entry is not None: + entry.event.set() + + # Map timeout policy to a downstream-actionable status. + if timeout_action == TimeoutAction.APPROVE: + decision_status = ApprovalStatus.APPROVED + else: + decision_status = ApprovalStatus.REJECTED + + return ApprovalDecision( + status=decision_status, + payload=stored.payload, + modified_payload=stored.modified_payload, + reason=f'auto-{timeout_action.value} on timeout', + decided_by='timeout-policy', + ) + + def _mark_timed_out(self, stored: ApprovalRequest) -> None: + """Persist a timed-out status and signal any waiter.""" + with self._lock: + entry = self._pending.pop(stored.approval_id, None) + current = self._store.get(stored.approval_id) + if current is not None and current.status == ApprovalStatus.PENDING: + current.status = ApprovalStatus.TIMED_OUT + current.decided_at = time.monotonic() + current.decision_reason = 'timeout with action=error' + self._store.put(current) + if entry is not None: + entry.event.set() + + @staticmethod + def _decision_from_request(request: ApprovalRequest) -> ApprovalDecision: + """Build an ``ApprovalDecision`` from a resolved ``ApprovalRequest``.""" + return ApprovalDecision( + status=request.status, + payload=request.payload, + modified_payload=request.modified_payload, + reason=request.decision_reason, + decided_by=request.decided_by, + ) + + @staticmethod + def _compute_wait_seconds(request: ApprovalRequest, override: Optional[float]) -> Optional[float]: + """Resolve the wait deadline from an explicit override or the stored deadline. + + ``None`` means "wait indefinitely", matching threading.Event.wait semantics. + Negative or zero values are clamped to a tiny non-zero value so the wait + returns immediately rather than blocking forever. + """ + if override is not None: + return max(0.0, float(override)) + if request.deadline_at is None: + return None + remaining = request.deadline_at - time.monotonic() + return max(0.0, remaining) diff --git a/packages/ai/src/ai/approvals/models.py b/packages/ai/src/ai/approvals/models.py new file mode 100644 index 000000000..7b4145c95 --- /dev/null +++ b/packages/ai/src/ai/approvals/models.py @@ -0,0 +1,146 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""Data models for human-in-the-loop approval requests and decisions.""" + +from __future__ import annotations + +import enum +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + + +class ApprovalStatus(str, enum.Enum): + """Lifecycle states of an approval request.""" + + PENDING = 'pending' + APPROVED = 'approved' + REJECTED = 'rejected' + TIMED_OUT = 'timed_out' + + +class TimeoutAction(str, enum.Enum): + """What to do when a pending approval times out.""" + + APPROVE = 'approve' + REJECT = 'reject' + ERROR = 'error' + + @classmethod + def parse(cls, value: Any) -> TimeoutAction: + """Coerce a user-supplied value into a valid TimeoutAction. + + Raises ValueError on unknown values; the silent-fallback behavior in + PR #542 was a known issue called out by reviewers. + """ + if isinstance(value, cls): + return value + if isinstance(value, str): + try: + return cls(value.strip().lower()) + except ValueError: + pass + allowed = ', '.join(m.value for m in cls) + raise ValueError(f'timeout_action must be one of {{{allowed}}}; got {value!r}') + + +@dataclass +class ApprovalRequest: + """A single approval request awaiting (or having received) a decision. + + Stored by ApprovalManager. Returned via REST. Persisted by ApprovalStore. + """ + + approval_id: str + pipeline_id: Optional[str] + node_id: Optional[str] + payload: Dict[str, Any] + status: ApprovalStatus = ApprovalStatus.PENDING + created_at: float = 0.0 + deadline_at: Optional[float] = None + decided_at: Optional[float] = None + decided_by: Optional[str] = None + decision_reason: Optional[str] = None + modified_payload: Optional[Dict[str, Any]] = None + profile: str = 'auto' + metadata: Dict[str, Any] = field(default_factory=dict) + # When True, reject() must be called with a non-empty reason. Lets compliance + # workflows enforce documented justification on every rejection. + require_reason_on_reject: bool = False + + def to_dict(self) -> Dict[str, Any]: + """Serialize for REST responses.""" + return { + 'approval_id': self.approval_id, + 'pipeline_id': self.pipeline_id, + 'node_id': self.node_id, + 'payload': self.payload, + 'status': self.status.value, + 'created_at': self.created_at, + 'deadline_at': self.deadline_at, + 'decided_at': self.decided_at, + 'decided_by': self.decided_by, + 'decision_reason': self.decision_reason, + 'modified_payload': self.modified_payload, + 'profile': self.profile, + 'metadata': self.metadata, + 'require_reason_on_reject': self.require_reason_on_reject, + } + + +@dataclass +class ApprovalDecision: + """Result of waiting for a decision on an approval request. + + ``payload`` is the request as registered (which may have been truncated + for the reviewer's preview). ``modified_payload`` is set *only* when the + reviewer explicitly supplied edits — keeping the two separate prevents + a truncated preview from being mistakenly applied back onto the original + answer downstream. + """ + + status: ApprovalStatus + payload: Dict[str, Any] + reason: Optional[str] = None + decided_by: Optional[str] = None + modified_payload: Optional[Dict[str, Any]] = None + + @property + def was_modified(self) -> bool: + """True when the reviewer supplied a modified payload.""" + return self.modified_payload is not None + + @property + def approved(self) -> bool: + """True when the decision authorizes downstream emission.""" + return self.status == ApprovalStatus.APPROVED + + @property + def rejected(self) -> bool: + """True when the decision blocks downstream emission.""" + return self.status == ApprovalStatus.REJECTED + + @property + def timed_out(self) -> bool: + """True when the decision was synthesized from a timeout policy.""" + return self.status == ApprovalStatus.TIMED_OUT diff --git a/packages/ai/src/ai/approvals/notifier.py b/packages/ai/src/ai/approvals/notifier.py new file mode 100644 index 000000000..58465a9d8 --- /dev/null +++ b/packages/ai/src/ai/approvals/notifier.py @@ -0,0 +1,261 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""Outbound notifications for new approval requests. + +Two channels are supported in PR A: + + * ``log`` — write a structured line via the supplied logger callable. + * ``webhook`` — POST a JSON body to a URL. + +Webhook URLs are validated up-front to block SSRF: private/loopback/link-local +ranges are forbidden by default for both IPv4 *and* IPv6. PR #542 only +covered IPv4 — IPv6 ULA / link-local / loopback were unhandled, which the +issue explicitly called out. +""" + +from __future__ import annotations + +import ipaddress +import json +import logging +import socket +import threading +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional +from urllib import error as urlerror +from urllib import request as urlrequest +from urllib.parse import urlparse + +from .models import ApprovalRequest + + +_DEFAULT_LOGGER = logging.getLogger('rocketride.approvals') + + +@dataclass +class NotifierConfig: + """Channel configuration for ``ApprovalNotifier``. + + Empty values disable a channel — both empty means notifications are no-ops, + which is fine: callers may rely solely on REST polling. + """ + + log_channel_enabled: bool = True + webhook_url: Optional[str] = None + webhook_timeout_seconds: float = 5.0 + webhook_headers: Dict[str, str] = field(default_factory=dict) + allow_private_webhook_hosts: bool = False # opt-in for self-hosted setups + # When True, no notification channels fire regardless of other settings. + # Surfaces "silent" as a first-class option for setups that rely solely on + # REST polling (e.g., a custom dashboard) — prevents the easy mistake of + # leaving log_channel_enabled=True and hoping nobody notices. + silent: bool = False + + +class ApprovalNotifier: + """Sends notifications when a new approval is created. + + Designed to fail-soft: a misbehaving webhook must not crash the pipeline + or block a decision. Errors are logged and swallowed. + """ + + def __init__( + self, + config: Optional[NotifierConfig] = None, + *, + logger: Optional[logging.Logger] = None, + url_opener: Optional[Callable[..., Any]] = None, + ) -> None: + """Configure channels. + + Args: + config: channel configuration; defaults are safe (log-only). + logger: where the log channel writes; defaults to a module logger. + url_opener: dependency-injection seam used by tests to avoid + real network I/O. Should accept ``(request, timeout=...)`` and + return a context-manager-compatible response object. + """ + self._config = config or NotifierConfig() + self._logger = logger or _DEFAULT_LOGGER + self._url_opener = url_opener or urlrequest.urlopen + self._lock = threading.Lock() + + @property + def config(self) -> NotifierConfig: + """Active configuration (read-only handle).""" + return self._config + + def notify(self, request: ApprovalRequest) -> List[str]: + """Fan out a notification for ``request`` to all enabled channels. + + Returns the list of channel names that delivered successfully. Failures + on individual channels are logged but never raised. + """ + delivered: List[str] = [] + + if self._config.silent: + # Explicit silent mode short-circuits everything. We don't even log + # at info level here because the whole point of silent mode is no + # operational signal — callers rely on REST polling. + return delivered + + if self._config.log_channel_enabled: + try: + self._notify_log(request) + delivered.append('log') + except Exception as exc: # pragma: no cover — defensive only + self._logger.warning('approval log channel failed: %s', exc) + + if self._config.webhook_url: + try: + self._notify_webhook(request) + delivered.append('webhook') + except Exception as exc: + self._logger.warning('approval webhook to %s failed: %s', self._config.webhook_url, exc) + + return delivered + + def _notify_log(self, request: ApprovalRequest) -> None: + self._logger.info( + 'approval pending: id=%s pipeline=%s node=%s profile=%s', + request.approval_id, + request.pipeline_id, + request.node_id, + request.profile, + ) + + def _notify_webhook(self, request: ApprovalRequest) -> None: + url = self._config.webhook_url + if not url: + return + validate_webhook_url(url, allow_private=self._config.allow_private_webhook_hosts) + + body = json.dumps({'event': 'approval.created', 'request': request.to_dict()}).encode('utf-8') + headers = {'Content-Type': 'application/json'} + headers.update(self._config.webhook_headers) + req = urlrequest.Request(url, data=body, method='POST', headers=headers) + + try: + with self._url_opener(req, timeout=self._config.webhook_timeout_seconds) as resp: + # Drain the response so the connection releases promptly. + resp.read() + except urlerror.URLError as exc: + raise WebhookDeliveryError(f'webhook delivery failed: {exc}') from exc + + +class WebhookDeliveryError(RuntimeError): + """Raised when the outbound webhook cannot be delivered.""" + + +class SSRFValidationError(ValueError): + """Raised when a webhook URL targets a forbidden address.""" + + +def validate_webhook_url(url: str, *, allow_private: bool = False) -> None: + """Reject URLs that would let an attacker target internal services. + + Resolves the hostname and inspects every returned IP — DNS rebinding + attacks fail because the actual ``urlopen`` will resolve again, but having + *any* private address in the answer is a strong heuristic for misuse. + + Args: + url: the URL to validate. Must use http or https. + allow_private: when True, private/loopback/link-local addresses are + permitted. Self-hosted operators may set this to point at internal + review services (e.g. an on-premises tracker). + + Raises: + SSRFValidationError if the URL is malformed, uses a non-HTTP scheme, + or resolves to a forbidden range. + """ + parsed = urlparse(url) + if parsed.scheme not in ('http', 'https'): + raise SSRFValidationError(f'webhook URL must use http or https; got {parsed.scheme!r}') + if not parsed.hostname: + raise SSRFValidationError(f'webhook URL is missing a hostname: {url!r}') + + host = parsed.hostname + addresses = _resolve_host(host) + if not addresses: + raise SSRFValidationError(f'webhook host {host!r} did not resolve to any address') + + forbidden: List[str] = [] + for address in addresses: + if _is_address_forbidden(address) and not allow_private: + forbidden.append(address) + + if forbidden: + raise SSRFValidationError(f'webhook host {host!r} resolves to forbidden address(es) {forbidden!r}; set allow_private=True if this targets an internal service on a trusted network') + + +def _resolve_host(host: str) -> List[str]: + """Resolve ``host`` to all IPv4+IPv6 addresses; tolerates lookup failure. + + Lookup failure returns an empty list; the caller treats that as forbidden. + A direct IP literal is parsed and returned as-is. + """ + try: + ipaddress.ip_address(host) + return [host] + except ValueError: + pass + + try: + results = socket.getaddrinfo(host, None) + except socket.gaierror: + return [] + + addresses: List[str] = [] + for family, _socktype, _proto, _canon, sockaddr in results: + if family in (socket.AF_INET, socket.AF_INET6): + addresses.append(sockaddr[0]) + return addresses + + +def _is_address_forbidden(address: str) -> bool: + """Return True if ``address`` falls in any private/loopback/link-local/reserved range. + + Covers IPv4 and IPv6. The IPv6 cases (loopback ::1, link-local fe80::/10, + unique-local fc00::/7, IPv4-mapped/embedded) were missing from PR #542. + """ + try: + ip = ipaddress.ip_address(address) + except ValueError: + return True # unparseable: treat as forbidden, fail-closed + + if ip.is_loopback or ip.is_private or ip.is_link_local or ip.is_reserved: + return True + if ip.is_unspecified or ip.is_multicast: + return True + + if isinstance(ip, ipaddress.IPv6Address): + # IPv4-mapped (::ffff:0:0/96) and IPv4-compat addresses bypass the IPv4 + # checks above unless we re-validate the embedded IPv4. ipaddress + # treats some of these as "private" already, but be explicit. + if ip.ipv4_mapped is not None and _is_address_forbidden(str(ip.ipv4_mapped)): + return True + if ip.sixtofour is not None and _is_address_forbidden(str(ip.sixtofour)): + return True + + return False diff --git a/packages/ai/src/ai/approvals/registry.py b/packages/ai/src/ai/approvals/registry.py new file mode 100644 index 000000000..8a5757c9f --- /dev/null +++ b/packages/ai/src/ai/approvals/registry.py @@ -0,0 +1,64 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""Process-wide singleton accessor for ApprovalManager. + +The approval node and the REST module need to share a single manager so that +a REST call on one thread can resolve a wait blocked on another. We expose a +small registry rather than a module-level global so tests can swap or reset +the instance cleanly. +""" + +from __future__ import annotations + +import threading +from typing import Optional + +from .manager import ApprovalManager + + +_lock = threading.Lock() +_manager: Optional[ApprovalManager] = None + + +def get_manager() -> ApprovalManager: + """Return the shared ApprovalManager, creating a default on first access.""" + global _manager + with _lock: + if _manager is None: + _manager = ApprovalManager() + return _manager + + +def set_manager(manager: ApprovalManager) -> None: + """Replace the shared ApprovalManager (used by initModule and tests).""" + global _manager + with _lock: + _manager = manager + + +def reset_manager() -> None: + """Drop the shared ApprovalManager. Mostly for test isolation.""" + global _manager + with _lock: + _manager = None diff --git a/packages/ai/src/ai/approvals/store.py b/packages/ai/src/ai/approvals/store.py new file mode 100644 index 000000000..89d4121a4 --- /dev/null +++ b/packages/ai/src/ai/approvals/store.py @@ -0,0 +1,99 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""Persistence backends for approval requests. + +The abstract ``ApprovalStore`` decouples the manager from a particular backend. +PR A ships only the in-memory store; a SQLite-backed store will follow in PR B +to satisfy the compliance / restart-survivability requirements in issue #635. +""" + +from __future__ import annotations + +import abc +import copy +import threading +from typing import Dict, List, Optional + +from .models import ApprovalRequest, ApprovalStatus + + +class ApprovalStore(abc.ABC): + """Abstract persistence backend for ``ApprovalRequest``.""" + + @abc.abstractmethod + def put(self, request: ApprovalRequest) -> None: + """Insert or replace a request.""" + + @abc.abstractmethod + def get(self, approval_id: str) -> Optional[ApprovalRequest]: + """Retrieve a request by id, or None if absent.""" + + @abc.abstractmethod + def delete(self, approval_id: str) -> bool: + """Remove a request by id. Returns True if it existed.""" + + @abc.abstractmethod + def list(self, status: Optional[ApprovalStatus] = None) -> List[ApprovalRequest]: + """Return all requests, optionally filtered by status.""" + + +class InMemoryStore(ApprovalStore): + """Thread-safe in-memory implementation. + + Lost on process restart. Suitable for development and the auto profile; + SQLite-backed store will be added in PR B for production use. + """ + + def __init__(self) -> None: + """Initialize an empty store.""" + self._lock = threading.Lock() + self._items: Dict[str, ApprovalRequest] = {} + + def put(self, request: ApprovalRequest) -> None: + """Insert or replace ``request``. Stored copy is independent of caller's reference.""" + with self._lock: + self._items[request.approval_id] = copy.deepcopy(request) + + def get(self, approval_id: str) -> Optional[ApprovalRequest]: + """Return a deep copy of the stored request, or None. + + Deep-copying on read prevents callers from mutating internal state — a + bug PR #542 reviewers flagged in the original ApprovalManager. + """ + with self._lock: + request = self._items.get(approval_id) + return copy.deepcopy(request) if request is not None else None + + def delete(self, approval_id: str) -> bool: + """Remove and return whether a record existed.""" + with self._lock: + return self._items.pop(approval_id, None) is not None + + def list(self, status: Optional[ApprovalStatus] = None) -> List[ApprovalRequest]: + """Return a list of deep-copied requests, optionally filtered by status.""" + with self._lock: + items = list(self._items.values()) + if status is not None: + items = [r for r in items if r.status == status] + return [copy.deepcopy(r) for r in items] diff --git a/packages/ai/src/ai/modules/__init__.py b/packages/ai/src/ai/modules/__init__.py index 239a0a546..9ebe2792e 100644 --- a/packages/ai/src/ai/modules/__init__.py +++ b/packages/ai/src/ai/modules/__init__.py @@ -1,6 +1,7 @@ # Explicit list of supported modules ALL = frozenset( { + 'approvals', 'chat', 'clients', 'data', diff --git a/packages/ai/src/ai/modules/approvals/__init__.py b/packages/ai/src/ai/modules/approvals/__init__.py new file mode 100644 index 000000000..28897125f --- /dev/null +++ b/packages/ai/src/ai/modules/approvals/__init__.py @@ -0,0 +1,63 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""Approvals Module for RocketRide Web Services. + +Exposes REST endpoints used by external reviewers to act on pending approval +requests. Mounted dynamically via ``server.use('approvals')``. + +Routes: + GET /approvals list requests (optional ?status= filter) + GET /approvals/{approval_id} fetch a single request + POST /approvals/{approval_id}/approve { modified_payload?, decided_by?, reason? } + POST /approvals/{approval_id}/reject { decided_by?, reason? } +""" + +from typing import TYPE_CHECKING, Any, Dict + +from ai.approvals import get_manager, set_manager, ApprovalManager + +from .endpoints import build_routes + +if TYPE_CHECKING: # pragma: no cover — type-only import to avoid pulling uvicorn at import time + from ai.web import WebServer + + +def initModule(server: 'WebServer', config: Dict[str, Any]) -> None: + """Register the approvals API on ``server``. + + Optional ``config['manager']`` can supply a pre-configured ApprovalManager; + by default the process-wide registry instance is used. + """ + manager: ApprovalManager + supplied = config.get('manager') if config else None + if supplied is not None: + if not isinstance(supplied, ApprovalManager): + raise TypeError("config['manager'] must be an ApprovalManager") + set_manager(supplied) + manager = supplied + else: + manager = get_manager() + + for path, handler, methods in build_routes(manager): + server.add_route(path, handler, methods) diff --git a/packages/ai/src/ai/modules/approvals/endpoints.py b/packages/ai/src/ai/modules/approvals/endpoints.py new file mode 100644 index 000000000..6289b86ad --- /dev/null +++ b/packages/ai/src/ai/modules/approvals/endpoints.py @@ -0,0 +1,132 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ============================================================================= + +"""HTTP route handlers for the approvals module. + +The handlers are built as a factory (``build_routes``) so the manager can be +injected — this keeps the handlers fully unit-testable on a bare FastAPI app +without going through ``ai.web``. +""" + +from __future__ import annotations + +from typing import Any, Callable, Dict, List, Optional, Tuple + +from fastapi import HTTPException, Query +from pydantic import BaseModel, ConfigDict + +from ai.approvals.manager import ( + ApprovalManager, + ApprovalManagerError, + ApprovalReasonRequiredError, +) +from ai.approvals.models import ApprovalStatus + + +class ApproveBody(BaseModel): + """Request body for POST /approvals/{id}/approve.""" + + model_config = ConfigDict(extra='forbid') + + modified_payload: Optional[Dict[str, Any]] = None + decided_by: Optional[str] = None + reason: Optional[str] = None + + +class RejectBody(BaseModel): + """Request body for POST /approvals/{id}/reject.""" + + model_config = ConfigDict(extra='forbid') + + decided_by: Optional[str] = None + reason: Optional[str] = None + + +def build_routes( + manager: ApprovalManager, +) -> List[Tuple[str, Callable, List[str]]]: + """Return ``[(path, handler, methods)]`` for ``server.add_route``. + + Bound to ``manager`` via closure — each handler reads/writes the same + instance. The mounting layer is responsible for path uniqueness. + """ + + def list_approvals(status: Optional[str] = Query(default=None)) -> Dict[str, Any]: + status_filter: Optional[ApprovalStatus] = None + if status is not None: + try: + status_filter = ApprovalStatus(status) + except ValueError as exc: + raise HTTPException( + status_code=400, + detail=f'invalid status filter {status!r}; allowed: {", ".join(s.value for s in ApprovalStatus)}', + ) from exc + + items = [r.to_dict() for r in manager.list_requests(status_filter)] + return {'status': 'OK', 'data': {'approvals': items, 'count': len(items)}} + + def get_approval(approval_id: str) -> Dict[str, Any]: + request = manager.get_request(approval_id) + if request is None: + raise HTTPException(status_code=404, detail=f'approval {approval_id!r} not found') + return {'status': 'OK', 'data': request.to_dict()} + + def approve(approval_id: str, body: ApproveBody) -> Dict[str, Any]: + try: + updated = manager.approve( + approval_id, + modified_payload=body.modified_payload, + decided_by=body.decided_by, + reason=body.reason, + ) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except ApprovalManagerError as exc: + # Already-resolved transition. + raise HTTPException(status_code=409, detail=str(exc)) from exc + return {'status': 'OK', 'data': updated.to_dict()} + + def reject(approval_id: str, body: RejectBody) -> Dict[str, Any]: + try: + updated = manager.reject( + approval_id, + decided_by=body.decided_by, + reason=body.reason, + ) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except ApprovalReasonRequiredError as exc: + # Client-side validation failure (missing reason). 400 distinguishes + # this from 409 (already-resolved state conflict) so reviewers know + # to retry with a reason rather than concluding the request is gone. + raise HTTPException(status_code=400, detail=str(exc)) from exc + except ApprovalManagerError as exc: + raise HTTPException(status_code=409, detail=str(exc)) from exc + return {'status': 'OK', 'data': updated.to_dict()} + + return [ + ('/approvals', list_approvals, ['GET']), + ('/approvals/{approval_id}', get_approval, ['GET']), + ('/approvals/{approval_id}/approve', approve, ['POST']), + ('/approvals/{approval_id}/reject', reject, ['POST']), + ] diff --git a/packages/ai/tests/ai/approvals/__init__.py b/packages/ai/tests/ai/approvals/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/ai/tests/ai/approvals/test_manager.py b/packages/ai/tests/ai/approvals/test_manager.py new file mode 100644 index 000000000..b36dde46d --- /dev/null +++ b/packages/ai/tests/ai/approvals/test_manager.py @@ -0,0 +1,249 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# ============================================================================= + +"""Unit tests for ai.approvals.manager.ApprovalManager. + +These cover the gaps from PR #542 specifically: + * blocking gate: wait() actually blocks until approve()/reject(). + * deepcopy-on-read: mutating a returned request never alters internal state. + * timeout_action validation: invalid values raise instead of falling back silently. + * pending cap: enforced. + * one-shot resolution: a late approve after timeout is rejected. +""" + +import threading +import time + +import pytest + +from ai.approvals.manager import ( + ApprovalManager, + ApprovalReasonRequiredError, + PendingCapacityError, +) +from ai.approvals.models import ApprovalStatus, TimeoutAction + + +def _spawn(target, *args): + t = threading.Thread(target=target, args=args, daemon=True) + t.start() + return t + + +class TestConstructorValidation: + def test_rejects_zero_pending_cap(self): + with pytest.raises(ValueError): + ApprovalManager(pending_cap=0) + + def test_rejects_negative_default_timeout(self): + with pytest.raises(ValueError): + ApprovalManager(default_timeout=-1) + + def test_rejects_invalid_default_timeout_action(self): + with pytest.raises(ValueError, match='timeout_action must be one of'): + ApprovalManager(default_timeout_action='shrug') + + +class TestCreate: + def test_create_returns_pending_request_with_uuid(self): + manager = ApprovalManager() + req = manager.create({'text': 'hi'}) + assert req.status == ApprovalStatus.PENDING + assert req.approval_id # non-empty + assert manager.pending_count == 1 + + def test_create_rejects_non_dict_payload(self): + manager = ApprovalManager() + with pytest.raises(TypeError): + manager.create('not a dict') # type: ignore[arg-type] + + def test_pending_cap_enforced(self): + manager = ApprovalManager(pending_cap=2) + manager.create({'i': 1}) + manager.create({'i': 2}) + with pytest.raises(PendingCapacityError): + manager.create({'i': 3}) + + def test_returned_request_is_a_deep_copy(self): + """Mutating the returned request must not affect internal state. + + This is the bug PR #542 reviewers flagged as 'shallow copy in get_request'. + """ + manager = ApprovalManager() + req = manager.create({'text': 'hi'}) + req.payload['text'] = 'mutated' + again = manager.get_request(req.approval_id) + assert again.payload['text'] == 'hi' + + +class TestApproveReject: + def test_approve_marks_request_and_unblocks_waiter(self): + manager = ApprovalManager() + req = manager.create({'text': 'hi'}) + + results = {} + + def waiter(): + results['decision'] = manager.wait(req.approval_id, timeout=5.0) + + t = _spawn(waiter) + # Give the waiter a chance to actually call wait(). + time.sleep(0.05) + manager.approve(req.approval_id, decided_by='alice') + t.join(timeout=5.0) + + assert not t.is_alive(), 'waiter should have unblocked' + decision = results['decision'] + assert decision.approved + assert decision.decided_by == 'alice' + assert manager.pending_count == 0 + + def test_reject_unblocks_waiter_with_rejected_status(self): + manager = ApprovalManager() + req = manager.create({'text': 'hi'}) + results = {} + + def waiter(): + results['decision'] = manager.wait(req.approval_id, timeout=5.0) + + t = _spawn(waiter) + time.sleep(0.05) + manager.reject(req.approval_id, reason='unsafe', decided_by='bob') + t.join(timeout=5.0) + assert results['decision'].rejected + assert results['decision'].reason == 'unsafe' + + def test_approve_with_modified_payload_propagates_to_decision(self): + manager = ApprovalManager() + req = manager.create({'text': 'draft'}) + results = {} + + def waiter(): + results['decision'] = manager.wait(req.approval_id, timeout=5.0) + + t = _spawn(waiter) + time.sleep(0.05) + manager.approve(req.approval_id, modified_payload={'text': 'final'}) + t.join(timeout=5.0) + decision = results['decision'] + assert decision.was_modified + assert decision.modified_payload == {'text': 'final'} + # Original payload is preserved for audit even after modification. + assert decision.payload == {'text': 'draft'} + + def test_double_approve_raises(self): + """A late decision after a prior resolution must not silently overwrite.""" + manager = ApprovalManager() + req = manager.create({'k': 'v'}) + manager.approve(req.approval_id, decided_by='alice') + with pytest.raises(Exception, match='already approved'): + manager.approve(req.approval_id, decided_by='attacker') + + def test_modified_payload_must_be_dict(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}) + with pytest.raises(TypeError): + manager.approve(req.approval_id, modified_payload='nope') # type: ignore[arg-type] + + def test_unknown_id_raises_keyerror(self): + manager = ApprovalManager() + with pytest.raises(KeyError): + manager.approve('does-not-exist') + + +class TestTimeout: + def test_timeout_with_reject_action_returns_rejected_decision(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}, timeout=0.05) + decision = manager.wait(req.approval_id, timeout=0.05, timeout_action=TimeoutAction.REJECT) + assert decision.rejected + # Underlying status should be timed_out for audit purposes. + stored = manager.get_request(req.approval_id) + assert stored.status == ApprovalStatus.TIMED_OUT + + def test_timeout_with_approve_action_returns_approved_decision(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}, timeout=0.05) + decision = manager.wait(req.approval_id, timeout=0.05, timeout_action=TimeoutAction.APPROVE) + assert decision.approved + + def test_timeout_with_error_action_raises(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}, timeout=0.05) + with pytest.raises(TimeoutError): + manager.wait(req.approval_id, timeout=0.05, timeout_action=TimeoutAction.ERROR) + stored = manager.get_request(req.approval_id) + assert stored.status == ApprovalStatus.TIMED_OUT + + def test_already_resolved_request_returns_immediately(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}) + manager.approve(req.approval_id) + decision = manager.wait(req.approval_id, timeout=5.0) + assert decision.approved + + +class TestRequireReasonOnReject: + def test_reject_without_reason_raises_when_required(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}, require_reason_on_reject=True) + with pytest.raises(ApprovalReasonRequiredError, match='requires a non-empty reason'): + manager.reject(req.approval_id) + + def test_reject_with_blank_reason_raises_when_required(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}, require_reason_on_reject=True) + with pytest.raises(ApprovalReasonRequiredError): + manager.reject(req.approval_id, reason=' ') + + def test_reject_with_reason_succeeds_when_required(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}, require_reason_on_reject=True) + updated = manager.reject(req.approval_id, reason='unsafe content') + assert updated.status == ApprovalStatus.REJECTED + assert updated.decision_reason == 'unsafe content' + + def test_reject_without_reason_succeeds_when_not_required(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}) # default: require_reason_on_reject=False + updated = manager.reject(req.approval_id) + assert updated.status == ApprovalStatus.REJECTED + + def test_approve_does_not_require_reason_even_when_flag_set(self): + """The flag is reject-specific by design — approvals are not gated.""" + manager = ApprovalManager() + req = manager.create({'k': 'v'}, require_reason_on_reject=True) + updated = manager.approve(req.approval_id) + assert updated.status == ApprovalStatus.APPROVED + + +class TestList: + def test_list_filters_by_status(self): + manager = ApprovalManager() + a = manager.create({'i': 1}) + manager.create({'i': 2}) + manager.approve(a.approval_id) + pending = manager.list_requests(status=ApprovalStatus.PENDING) + assert len(pending) == 1 + approved = manager.list_requests(status=ApprovalStatus.APPROVED) + assert len(approved) == 1 + + +class TestDiscardResolved: + def test_cannot_discard_pending(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}) + with pytest.raises(Exception, match='cannot discard pending'): + manager.discard_resolved(req.approval_id) + + def test_can_discard_resolved(self): + manager = ApprovalManager() + req = manager.create({'k': 'v'}) + manager.approve(req.approval_id) + assert manager.discard_resolved(req.approval_id) is True + assert manager.get_request(req.approval_id) is None + + def test_discard_unknown_returns_false(self): + assert ApprovalManager().discard_resolved('nope') is False diff --git a/packages/ai/tests/ai/approvals/test_models.py b/packages/ai/tests/ai/approvals/test_models.py new file mode 100644 index 000000000..e8f4b6ff9 --- /dev/null +++ b/packages/ai/tests/ai/approvals/test_models.py @@ -0,0 +1,84 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# ============================================================================= + +"""Unit tests for ai.approvals.models.""" + +import pytest + +from ai.approvals.models import ( + ApprovalDecision, + ApprovalRequest, + ApprovalStatus, + TimeoutAction, +) + + +class TestTimeoutAction: + def test_parse_accepts_enum_member(self): + assert TimeoutAction.parse(TimeoutAction.APPROVE) is TimeoutAction.APPROVE + + def test_parse_accepts_lowercase_string(self): + assert TimeoutAction.parse('reject') is TimeoutAction.REJECT + + def test_parse_normalizes_whitespace_and_case(self): + assert TimeoutAction.parse(' ERROR ') is TimeoutAction.ERROR + + def test_parse_rejects_unknown_values(self): + with pytest.raises(ValueError, match='timeout_action must be one of'): + TimeoutAction.parse('shrug') + + def test_parse_rejects_non_string_non_enum(self): + with pytest.raises(ValueError): + TimeoutAction.parse(42) + + +class TestApprovalRequest: + def test_to_dict_includes_all_fields(self): + request = ApprovalRequest( + approval_id='id-1', + pipeline_id='p', + node_id='n', + payload={'text': 'hi'}, + status=ApprovalStatus.PENDING, + created_at=1.0, + deadline_at=2.0, + ) + d = request.to_dict() + assert d['approval_id'] == 'id-1' + assert d['payload'] == {'text': 'hi'} + assert d['status'] == 'pending' + assert d['deadline_at'] == 2.0 + assert d['decided_by'] is None + # New compliance hint: defaults to False, present on every dict. + assert d['require_reason_on_reject'] is False + + def test_to_dict_round_trips_require_reason_flag(self): + request = ApprovalRequest( + approval_id='id-2', + pipeline_id=None, + node_id=None, + payload={'k': 'v'}, + require_reason_on_reject=True, + ) + assert request.to_dict()['require_reason_on_reject'] is True + + +class TestApprovalDecision: + def test_approved_property(self): + d = ApprovalDecision(status=ApprovalStatus.APPROVED, payload={}) + assert d.approved + assert not d.rejected + assert not d.timed_out + + def test_rejected_property(self): + d = ApprovalDecision(status=ApprovalStatus.REJECTED, payload={}) + assert d.rejected + assert not d.approved + + def test_timed_out_property(self): + d = ApprovalDecision(status=ApprovalStatus.TIMED_OUT, payload={}) + assert d.timed_out + assert not d.approved + assert not d.rejected diff --git a/packages/ai/tests/ai/approvals/test_notifier.py b/packages/ai/tests/ai/approvals/test_notifier.py new file mode 100644 index 000000000..f4e8ec0d9 --- /dev/null +++ b/packages/ai/tests/ai/approvals/test_notifier.py @@ -0,0 +1,185 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# ============================================================================= + +"""Unit tests for ai.approvals.notifier — log + webhook channels and SSRF. + +These cover the IPv6 SSRF gaps PR #542 reviewers explicitly flagged: +loopback ``::1``, link-local ``fe80::/10``, unique-local ``fc00::/7``, and +IPv4-mapped IPv6 (``::ffff:0:0/96``). +""" + +import io +import logging +from unittest.mock import MagicMock + +import pytest + +from ai.approvals.models import ApprovalRequest, ApprovalStatus +from ai.approvals.notifier import ( + ApprovalNotifier, + NotifierConfig, + SSRFValidationError, + validate_webhook_url, +) + + +def _make_request() -> ApprovalRequest: + return ApprovalRequest( + approval_id='r-1', + pipeline_id='p-1', + node_id='n-1', + payload={'text': 'hi'}, + status=ApprovalStatus.PENDING, + ) + + +class TestSSRFValidation: + @pytest.mark.parametrize( + 'host', + [ + 'http://127.0.0.1/hook', + 'http://localhost/hook', + 'http://10.0.0.1/hook', + 'http://192.168.1.1/hook', + 'http://172.16.0.1/hook', + 'http://169.254.169.254/latest/meta-data', # AWS metadata + 'http://[::1]/hook', # IPv6 loopback + 'http://[fe80::1]/hook', # IPv6 link-local + 'http://[fc00::1]/hook', # IPv6 unique-local + 'http://[fd12:3456:789a::1]/hook', # IPv6 ULA range + 'http://[::ffff:127.0.0.1]/hook', # IPv4-mapped loopback + ], + ) + def test_blocks_private_and_loopback_addresses(self, host): + with pytest.raises(SSRFValidationError): + validate_webhook_url(host) + + def test_allows_private_when_explicitly_opted_in(self): + # Self-hosted operators may want to point at internal services. + validate_webhook_url('http://10.0.0.5/hook', allow_private=True) + + def test_rejects_non_http_scheme(self): + with pytest.raises(SSRFValidationError, match='http or https'): + validate_webhook_url('file:///etc/passwd') + + def test_rejects_missing_hostname(self): + with pytest.raises(SSRFValidationError): + validate_webhook_url('http:///nohost') + + def test_allows_public_address(self): + # 8.8.8.8 is a stable public address suitable for parameter validation. + validate_webhook_url('https://8.8.8.8/hook') + + +class TestApprovalNotifier: + def test_log_channel_writes_structured_line(self): + logger = logging.getLogger('test.approvals') + logger.handlers.clear() + stream = io.StringIO() + handler = logging.StreamHandler(stream) + handler.setFormatter(logging.Formatter('%(message)s')) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + + notifier = ApprovalNotifier(NotifierConfig(log_channel_enabled=True), logger=logger) + delivered = notifier.notify(_make_request()) + assert delivered == ['log'] + assert 'approval pending: id=r-1' in stream.getvalue() + + def test_log_channel_disabled_is_skipped(self): + notifier = ApprovalNotifier(NotifierConfig(log_channel_enabled=False)) + assert notifier.notify(_make_request()) == [] + + def test_webhook_posts_when_configured(self): + # Use a public-resolving hostname so SSRF validation passes; + # the url_opener is mocked, so no network I/O actually happens. + opener = MagicMock() + opener.return_value.__enter__.return_value.read.return_value = b'ok' + + notifier = ApprovalNotifier( + NotifierConfig( + log_channel_enabled=False, + webhook_url='https://8.8.8.8/hook', + ), + url_opener=opener, + ) + delivered = notifier.notify(_make_request()) + assert delivered == ['webhook'] + opener.assert_called_once() + request_arg = opener.call_args.args[0] + assert request_arg.full_url == 'https://8.8.8.8/hook' + assert request_arg.get_header('Content-type') == 'application/json' + + def test_webhook_failure_is_swallowed_and_logged(self): + """A misbehaving webhook must never crash the pipeline.""" + opener = MagicMock(side_effect=ConnectionError('boom')) + # Use a real public-resolving address so SSRF passes. + notifier = ApprovalNotifier( + NotifierConfig( + log_channel_enabled=False, + webhook_url='https://8.8.8.8/hook', + ), + url_opener=opener, + ) + # Should not raise. + assert notifier.notify(_make_request()) == [] + + def test_webhook_to_private_address_rejected_by_default(self): + notifier = ApprovalNotifier( + NotifierConfig( + log_channel_enabled=False, + webhook_url='http://127.0.0.1/hook', + ), + url_opener=MagicMock(), + ) + # SSRF blocks delivery; failure is swallowed; channel reports nothing delivered. + assert notifier.notify(_make_request()) == [] + + def test_webhook_to_private_address_allowed_with_opt_in(self): + opener = MagicMock() + opener.return_value.__enter__.return_value.read.return_value = b'ok' + notifier = ApprovalNotifier( + NotifierConfig( + log_channel_enabled=False, + webhook_url='http://127.0.0.1/hook', + allow_private_webhook_hosts=True, + ), + url_opener=opener, + ) + assert notifier.notify(_make_request()) == ['webhook'] + + def test_silent_mode_short_circuits_all_channels(self): + """silent=True must suppress every channel even when others are configured. + + Reviewers using a custom dashboard rely on REST polling — accidentally + leaving log_channel_enabled=True would leak operational signal. + """ + opener = MagicMock() + notifier = ApprovalNotifier( + NotifierConfig( + log_channel_enabled=True, + webhook_url='https://8.8.8.8/hook', + silent=True, + ), + url_opener=opener, + ) + delivered = notifier.notify(_make_request()) + assert delivered == [] + opener.assert_not_called() + + def test_custom_headers_are_attached(self): + opener = MagicMock() + opener.return_value.__enter__.return_value.read.return_value = b'ok' + notifier = ApprovalNotifier( + NotifierConfig( + log_channel_enabled=False, + webhook_url='https://8.8.8.8/hook', + webhook_headers={'X-Tenant': 'rocketride'}, + ), + url_opener=opener, + ) + notifier.notify(_make_request()) + req = opener.call_args.args[0] + assert req.get_header('X-tenant') == 'rocketride' diff --git a/packages/ai/tests/ai/approvals/test_registry.py b/packages/ai/tests/ai/approvals/test_registry.py new file mode 100644 index 000000000..655ccdc95 --- /dev/null +++ b/packages/ai/tests/ai/approvals/test_registry.py @@ -0,0 +1,41 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# ============================================================================= + +"""Unit tests for the process-wide manager registry.""" + +from ai.approvals import get_manager, reset_manager, set_manager +from ai.approvals.manager import ApprovalManager + + +def setup_function(_func): + reset_manager() + + +def teardown_function(_func): + reset_manager() + + +def test_first_get_creates_default_manager(): + m = get_manager() + assert isinstance(m, ApprovalManager) + + +def test_subsequent_get_returns_same_instance(): + a = get_manager() + b = get_manager() + assert a is b + + +def test_set_manager_replaces_instance(): + custom = ApprovalManager(pending_cap=7) + set_manager(custom) + assert get_manager() is custom + + +def test_reset_manager_drops_instance(): + a = get_manager() + reset_manager() + b = get_manager() + assert a is not b diff --git a/packages/ai/tests/ai/approvals/test_store.py b/packages/ai/tests/ai/approvals/test_store.py new file mode 100644 index 000000000..0e342432d --- /dev/null +++ b/packages/ai/tests/ai/approvals/test_store.py @@ -0,0 +1,68 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# ============================================================================= + +"""Unit tests for ai.approvals.store.InMemoryStore.""" + +from ai.approvals.models import ApprovalRequest, ApprovalStatus +from ai.approvals.store import InMemoryStore + + +def _make_request(approval_id: str = 'r1', status: ApprovalStatus = ApprovalStatus.PENDING) -> ApprovalRequest: + return ApprovalRequest( + approval_id=approval_id, + pipeline_id=None, + node_id=None, + payload={'k': 'v'}, + status=status, + ) + + +class TestInMemoryStore: + def test_put_and_get_round_trip(self): + store = InMemoryStore() + store.put(_make_request()) + got = store.get('r1') + assert got is not None + assert got.approval_id == 'r1' + + def test_get_returns_deep_copy(self): + store = InMemoryStore() + original = _make_request() + store.put(original) + retrieved = store.get('r1') + retrieved.payload['k'] = 'mutated' + # Mutating the retrieved copy must not bleed back into the store. + again = store.get('r1') + assert again.payload['k'] == 'v' + + def test_get_missing_returns_none(self): + assert InMemoryStore().get('nope') is None + + def test_delete_returns_true_when_existed(self): + store = InMemoryStore() + store.put(_make_request()) + assert store.delete('r1') is True + assert store.delete('r1') is False + + def test_list_returns_all_when_no_filter(self): + store = InMemoryStore() + store.put(_make_request('r1', ApprovalStatus.PENDING)) + store.put(_make_request('r2', ApprovalStatus.APPROVED)) + ids = sorted(r.approval_id for r in store.list()) + assert ids == ['r1', 'r2'] + + def test_list_filters_by_status(self): + store = InMemoryStore() + store.put(_make_request('r1', ApprovalStatus.PENDING)) + store.put(_make_request('r2', ApprovalStatus.APPROVED)) + pending = store.list(status=ApprovalStatus.PENDING) + assert len(pending) == 1 and pending[0].approval_id == 'r1' + + def test_put_replaces_existing(self): + store = InMemoryStore() + store.put(_make_request('r1', ApprovalStatus.PENDING)) + store.put(_make_request('r1', ApprovalStatus.APPROVED)) + got = store.get('r1') + assert got.status == ApprovalStatus.APPROVED diff --git a/packages/ai/tests/ai/modules/approvals/__init__.py b/packages/ai/tests/ai/modules/approvals/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/ai/tests/ai/modules/approvals/test_endpoints.py b/packages/ai/tests/ai/modules/approvals/test_endpoints.py new file mode 100644 index 000000000..f634197f9 --- /dev/null +++ b/packages/ai/tests/ai/modules/approvals/test_endpoints.py @@ -0,0 +1,177 @@ +# ============================================================================= +# MIT License +# Copyright (c) 2026 Aparavi Software AG +# ============================================================================= + +"""HTTP-level tests for the approvals REST module. + +Mounts the routes returned by ``build_routes`` on a bare FastAPI app so we +exercise FastAPI request parsing, validation, and status-code mapping without +pulling in ``ai.web.WebServer`` and its rocketlib transitive imports. +""" + +import threading +import time + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from ai.approvals.manager import ApprovalManager +from ai.approvals.models import ApprovalStatus +from ai.modules.approvals.endpoints import build_routes + + +@pytest.fixture +def manager() -> ApprovalManager: + return ApprovalManager() + + +@pytest.fixture +def client(manager: ApprovalManager) -> TestClient: + app = FastAPI() + for path, handler, methods in build_routes(manager): + app.router.add_api_route(path, handler, methods=methods) + return TestClient(app) + + +class TestList: + def test_empty_list(self, client: TestClient): + resp = client.get('/approvals') + assert resp.status_code == 200 + body = resp.json() + assert body['status'] == 'OK' + assert body['data']['count'] == 0 + assert body['data']['approvals'] == [] + + def test_lists_pending(self, client: TestClient, manager: ApprovalManager): + manager.create({'k': 1}) + manager.create({'k': 2}) + body = client.get('/approvals').json() + assert body['data']['count'] == 2 + + def test_status_filter(self, client: TestClient, manager: ApprovalManager): + a = manager.create({'k': 1}) + manager.create({'k': 2}) + manager.approve(a.approval_id) + body = client.get('/approvals?status=approved').json() + assert body['data']['count'] == 1 + assert body['data']['approvals'][0]['status'] == 'approved' + + def test_invalid_status_filter_returns_400(self, client: TestClient): + resp = client.get('/approvals?status=lol') + assert resp.status_code == 400 + assert 'invalid status filter' in resp.json()['detail'] + + +class TestGet: + def test_returns_request(self, client: TestClient, manager: ApprovalManager): + req = manager.create({'text': 'hi'}) + body = client.get(f'/approvals/{req.approval_id}').json() + assert body['data']['approval_id'] == req.approval_id + assert body['data']['payload'] == {'text': 'hi'} + + def test_404_when_missing(self, client: TestClient): + resp = client.get('/approvals/does-not-exist') + assert resp.status_code == 404 + + +class TestApprove: + def test_approves_pending_request(self, client: TestClient, manager: ApprovalManager): + req = manager.create({'text': 'draft'}) + resp = client.post( + f'/approvals/{req.approval_id}/approve', + json={'decided_by': 'alice'}, + ) + assert resp.status_code == 200 + assert resp.json()['data']['status'] == 'approved' + assert resp.json()['data']['decided_by'] == 'alice' + + def test_approve_with_modified_payload(self, client: TestClient, manager: ApprovalManager): + req = manager.create({'text': 'draft'}) + resp = client.post( + f'/approvals/{req.approval_id}/approve', + json={'modified_payload': {'text': 'final'}, 'decided_by': 'bob'}, + ) + assert resp.json()['data']['modified_payload'] == {'text': 'final'} + + def test_approve_unblocks_waiter(self, client: TestClient, manager: ApprovalManager): + """End-to-end: pipeline thread waits, REST call resolves.""" + req = manager.create({'text': 'draft'}) + results = {} + + def waiter(): + results['decision'] = manager.wait(req.approval_id, timeout=5.0) + + t = threading.Thread(target=waiter, daemon=True) + t.start() + time.sleep(0.05) + resp = client.post(f'/approvals/{req.approval_id}/approve', json={}) + assert resp.status_code == 200 + t.join(timeout=5.0) + assert not t.is_alive() + assert results['decision'].approved + + def test_404_when_unknown(self, client: TestClient): + resp = client.post('/approvals/nope/approve', json={}) + assert resp.status_code == 404 + + def test_409_when_already_resolved(self, client: TestClient, manager: ApprovalManager): + req = manager.create({'k': 'v'}) + manager.approve(req.approval_id) + resp = client.post(f'/approvals/{req.approval_id}/approve', json={}) + assert resp.status_code == 409 + + def test_extra_fields_rejected(self, client: TestClient, manager: ApprovalManager): + """Pydantic extra='forbid' catches typos in request bodies.""" + req = manager.create({'k': 'v'}) + resp = client.post( + f'/approvals/{req.approval_id}/approve', + json={'desided_by': 'typo'}, + ) + assert resp.status_code == 422 + + +class TestReject: + def test_rejects_with_reason(self, client: TestClient, manager: ApprovalManager): + req = manager.create({'k': 'v'}) + resp = client.post( + f'/approvals/{req.approval_id}/reject', + json={'reason': 'unsafe', 'decided_by': 'reviewer'}, + ) + body = resp.json()['data'] + assert body['status'] == 'rejected' + assert body['decision_reason'] == 'unsafe' + + def test_404_when_unknown(self, client: TestClient): + resp = client.post('/approvals/nope/reject', json={}) + assert resp.status_code == 404 + + def test_409_when_already_resolved(self, client: TestClient, manager: ApprovalManager): + req = manager.create({'k': 'v'}) + manager.approve(req.approval_id) + resp = client.post(f'/approvals/{req.approval_id}/reject', json={}) + assert resp.status_code == 409 + + def test_400_when_reason_required_but_missing(self, client: TestClient, manager: ApprovalManager): + """Compliance: refuse rejection without a reason, distinct status from 409.""" + req = manager.create({'k': 'v'}, require_reason_on_reject=True) + resp = client.post(f'/approvals/{req.approval_id}/reject', json={}) + assert resp.status_code == 400 + assert 'requires a non-empty reason' in resp.json()['detail'] + # Request must still be PENDING — failed reject does not consume the gate. + assert manager.get_request(req.approval_id).status == ApprovalStatus.PENDING + + def test_400_when_reason_required_but_blank(self, client: TestClient, manager: ApprovalManager): + req = manager.create({'k': 'v'}, require_reason_on_reject=True) + resp = client.post(f'/approvals/{req.approval_id}/reject', json={'reason': ' '}) + assert resp.status_code == 400 + + def test_reject_succeeds_with_reason_when_required(self, client: TestClient, manager: ApprovalManager): + req = manager.create({'k': 'v'}, require_reason_on_reject=True) + resp = client.post( + f'/approvals/{req.approval_id}/reject', + json={'reason': 'fails policy', 'decided_by': 'reviewer'}, + ) + assert resp.status_code == 200 + assert resp.json()['data']['decision_reason'] == 'fails policy' diff --git a/packages/ai/tests/conftest.py b/packages/ai/tests/conftest.py index b572ad00a..4d876acd8 100644 --- a/packages/ai/tests/conftest.py +++ b/packages/ai/tests/conftest.py @@ -17,3 +17,11 @@ mock_rocketlib = MagicMock() mock_rocketlib.debug = MagicMock() sys.modules['rocketlib'] = mock_rocketlib + +# Mock depends module — bundled with the engine binary at packages/server/engine-lib, +# not installable via pip. Without this, importing anything under ai.* fails because +# ai/__init__.py does `from depends import depends`. +if 'depends' not in sys.modules: + mock_depends = MagicMock() + mock_depends.depends = MagicMock(return_value=None) + sys.modules['depends'] = mock_depends diff --git a/packages/shared-ui/src/components/approvals/ApprovalPanel.tsx b/packages/shared-ui/src/components/approvals/ApprovalPanel.tsx new file mode 100644 index 000000000..6efee9b34 --- /dev/null +++ b/packages/shared-ui/src/components/approvals/ApprovalPanel.tsx @@ -0,0 +1,517 @@ +// ============================================================================= +// MIT License +// Copyright (c) 2026 Aparavi Software AG +// ============================================================================= + +/** + * ApprovalPanel — Reviewer UI for human-in-the-loop approval requests. + * + * Polls GET /approvals?status=pending every 2 seconds while mounted. + * Each pending request is shown as a card with: + * - the payload (text or JSON) + * - an editable textarea so reviewers can modify the text before approving + * - Approve / Reject buttons + * - a countdown to the deadline + * + * Communicates directly with the REST API built in packages/ai (issue #635). + * No WebSocket needed — the approval gate lives in the pipeline thread. + */ + +import React, { CSSProperties, useCallback, useEffect, useRef, useState } from 'react'; +import { createPortal } from 'react-dom'; +import { commonStyles } from '../../themes/styles'; + +// ============================================================================= +// Types +// ============================================================================= + +interface ApprovalRequest { + approval_id: string; + status: 'pending' | 'approved' | 'rejected' | 'timed_out'; + payload: Record; + modified_payload?: Record | null; + created_at: number; + deadline_at?: number | null; + decided_by?: string | null; + decision_reason?: string | null; + require_reason_on_reject?: boolean; + profile?: string; + metadata?: Record; +} + +interface ApiListResponse { + status: string; + data: { count: number; approvals: ApprovalRequest[] }; +} + +// ============================================================================= +// Styles +// ============================================================================= + +const S: Record = { + empty: { + color: 'var(--rr-text-disabled)', + textAlign: 'center', + padding: 40, + fontSize: 13, + }, + card: { + ...commonStyles.card, + marginBottom: 20, + borderRadius: 8, + }, + cardHeader: { + ...commonStyles.cardHeader, + borderRadius: '8px 8px 0 0', + gap: 10, + }, + cardBody: { + ...commonStyles.cardBody, + display: 'flex', + flexDirection: 'column' as const, + gap: 12, + }, + badge: { + fontSize: 10, + fontWeight: 700, + padding: '2px 8px', + borderRadius: 10, + backgroundColor: 'var(--rr-color-warning)', + color: '#000', + }, + badgeTimeout: { + fontSize: 10, + fontWeight: 700, + padding: '2px 8px', + borderRadius: 10, + backgroundColor: 'var(--rr-color-error)', + color: '#fff', + }, + label: { + ...commonStyles.labelUppercase, + marginBottom: 4, + }, + payloadBlock: { + background: 'var(--rr-bg-default)', + border: '1px solid var(--rr-border)', + borderRadius: 4, + padding: '8px 10px', + fontSize: 12, + ...commonStyles.fontMono, + whiteSpace: 'pre-wrap' as const, + wordBreak: 'break-word' as const, + maxHeight: 200, + overflowY: 'auto' as const, + color: 'var(--rr-text-primary)', + }, + textarea: { + width: '100%', + minHeight: 120, + background: 'var(--rr-bg-default)', + border: '1px solid var(--rr-border)', + borderRadius: 4, + padding: '8px 10px', + fontSize: 12, + ...commonStyles.fontMono, + color: 'var(--rr-text-primary)', + resize: 'vertical' as const, + boxSizing: 'border-box' as const, + }, + actionRow: { + display: 'flex', + gap: 8, + alignItems: 'center', + }, + metaRow: { + display: 'flex', + gap: 16, + alignItems: 'center', + fontSize: 11, + color: 'var(--rr-text-secondary)', + }, + rejectModal: { + backgroundColor: 'var(--rr-bg-paper)', + border: '1px solid var(--rr-border)', + borderRadius: 8, + width: 420, + maxWidth: '90vw', + padding: 24, + display: 'flex', + flexDirection: 'column' as const, + gap: 16, + boxShadow: '0 8px 32px rgba(0,0,0,0.35)', + }, + rejectTitle: { + fontSize: 14, + fontWeight: 600, + color: 'var(--rr-text-primary)', + }, + rejectInput: { + width: '100%', + background: 'var(--rr-bg-default)', + border: '1px solid var(--rr-border)', + borderRadius: 4, + padding: '8px 10px', + fontSize: 12, + color: 'var(--rr-text-primary)', + boxSizing: 'border-box' as const, + }, + errorText: { + fontSize: 11, + color: 'var(--rr-color-error)', + }, + spinnerWrap: { + display: 'flex', + alignItems: 'center', + gap: 8, + padding: '8px 0', + fontSize: 12, + color: 'var(--rr-text-secondary)', + }, +}; + +// ============================================================================= +// Helpers +// ============================================================================= + +function extractDisplayText(payload: Record): { mode: 'text' | 'json'; value: string } { + if ('text' in payload && typeof payload.text === 'string') { + return { mode: 'text', value: payload.text }; + } + if ('json' in payload) { + return { mode: 'json', value: JSON.stringify(payload.json, null, 2) }; + } + return { mode: 'json', value: JSON.stringify(payload, null, 2) }; +} + +function useCountdown(deadlineAt: number | null | undefined): string { + const [remaining, setRemaining] = useState(''); + + useEffect(() => { + if (!deadlineAt) { + setRemaining(''); + return; + } + const tick = () => { + const now = Date.now() / 1000; + const secs = Math.max(0, Math.round(deadlineAt - now)); + if (secs <= 0) { + setRemaining('expired'); + return; + } + const m = Math.floor(secs / 60); + const s = secs % 60; + setRemaining(m > 0 ? `${m}m ${s}s` : `${s}s`); + }; + tick(); + const id = setInterval(tick, 1000); + return () => clearInterval(id); + }, [deadlineAt]); + + return remaining; +} + +// ============================================================================= +// RejectModal +// ============================================================================= + +interface RejectModalProps { + approvalId: string; + requireReason: boolean; + onConfirm: (approvalId: string, reason: string) => Promise; + onCancel: () => void; +} + +const RejectModal: React.FC = ({ approvalId, requireReason, onConfirm, onCancel }) => { + const [reason, setReason] = useState(''); + const [submitting, setSubmitting] = useState(false); + const [error, setError] = useState(''); + + const handleConfirm = async () => { + if (requireReason && !reason.trim()) { + setError('A reason is required for this rejection.'); + return; + } + setSubmitting(true); + setError(''); + try { + await onConfirm(approvalId, reason); + } catch (e: unknown) { + setError(e instanceof Error ? e.message : 'Rejection failed.'); + setSubmitting(false); + } + }; + + const handleBackdrop = (e: React.MouseEvent) => { + if (e.target === e.currentTarget) onCancel(); + }; + + return createPortal( +
+
e.stopPropagation()}> +
Reject approval
+
+
Reason {requireReason ? '(required)' : '(optional)'}
+ setReason(e.target.value)} + autoFocus + onKeyDown={(e) => { + if (e.key === 'Enter') handleConfirm(); + if (e.key === 'Escape') onCancel(); + }} + /> +
+ {error &&
{error}
} +
+ + +
+
+
, + document.body + ); +}; + +// ============================================================================= +// ApprovalCard +// ============================================================================= + +interface ApprovalCardProps { + req: ApprovalRequest; + onApprove: (id: string, modifiedText: string | null, mode: 'text' | 'json') => Promise; + onReject: (id: string, reason: string) => Promise; +} + +const ApprovalCard: React.FC = ({ req, onApprove, onReject }) => { + const { mode, value } = extractDisplayText(req.payload); + const [editedValue, setEditedValue] = useState(value); + const [approving, setApproving] = useState(false); + const [showReject, setShowReject] = useState(false); + const [approveError, setApproveError] = useState(''); + const countdown = useCountdown(req.deadline_at); + const isExpiringSoon = countdown !== '' && countdown !== 'expired' && req.deadline_at ? req.deadline_at - Date.now() / 1000 < 30 : false; + + const wasModified = editedValue.trim() !== value.trim(); + + const handleApprove = async () => { + setApproving(true); + setApproveError(''); + try { + await onApprove(req.approval_id, wasModified ? editedValue : null, mode); + } catch (e: unknown) { + setApproveError(e instanceof Error ? e.message : 'Approval failed.'); + setApproving(false); + } + }; + + const handleRejectConfirm = async (id: string, reason: string) => { + await onReject(id, reason); + setShowReject(false); + }; + + const profile = req.profile ?? 'auto'; + const short_id = req.approval_id.slice(0, 8); + + return ( +
+ {/* Card header */} +
+ + Request {short_id}… + + {profile !== 'auto' && profile: {profile}} + {countdown === 'expired' ? Expired : countdown ? ⏱ {countdown} : null} + PENDING +
+ + {/* Card body */} +
+ {/* Original payload (read-only preview) */} +
+
Payload {mode === 'json' ? '(JSON)' : '(text)'}
+
{value}
+
+ + {/* Editable version for modification */} +
+
+ Edit before approving + {wasModified && ● modified} +
+