diff --git a/ductor_bot/config.py b/ductor_bot/config.py index 4dfd4a1f..56b1a7ff 100644 --- a/ductor_bot/config.py +++ b/ductor_bot/config.py @@ -578,8 +578,16 @@ class ModelRegistry: @staticmethod def provider_for(model_id: str) -> str: - """Return the provider for a model ID.""" - if model_id in CLAUDE_MODELS: + """Return the provider for a model ID. + + Claude Code accepts both short aliases (opus/sonnet/haiku) and full + Claude model IDs such as ``claude-opus-4-7`` / + ``claude-sonnet-4-6`` / ``claude-haiku-4-5-20251001``. The registry + previously matched only the short aliases, so a full Claude model + ID fell through to the codex branch and broke Claude routing for + clients passing the canonical model name. + """ + if model_id in CLAUDE_MODELS or model_id.startswith("claude-"): return "claude" if ( model_id in _GEMINI_ALIASES diff --git a/ductor_bot/wake_slo_responder.py b/ductor_bot/wake_slo_responder.py new file mode 100644 index 00000000..55990e09 --- /dev/null +++ b/ductor_bot/wake_slo_responder.py @@ -0,0 +1,174 @@ +"""Recipient-side Wake SLO responder. + +This module short-circuits AgentComm wake-push payloads that match the +Wake SLO probe protocol, so they get an ack + pong reply via the local +Qoopia MCP endpoint **without** spawning a Claude session. The point is +to measure the wake-push transport and the recipient-side dispatch +machinery, not the LLM session liveness. + +It is the recipient-side counterpart to the sender-side probe at +``/srv/qoopia/code/scripts/wake_slo_probe.ts``. A sender-side responder +would synthesize the pong locally and fake L2/L3/L4 proof — see +:doc:`wake-slo-responder-contract.md` for the architecture rationale. + +Auth: reads the recipient agent's Qoopia API key from a file path +configured via the ``QOOPIA_API_KEY_FILE`` environment variable, falling +back to ``~/.ductor-corsairmain/.secrets/qoopia_api_key``. The MCP +endpoint defaults to ``http://127.0.0.1:3738/mcp`` and is overridable via +``QOOPIA_PUBLIC_URL``. + +Network: stdlib ``urllib.request`` only — no extra dependencies. The +Qoopia MCP server is configured in stateless mode (no session-id), so a +single JSON-RPC POST per call is enough. +""" + +from __future__ import annotations + +import json +import logging +import os +import re +from pathlib import Path +from typing import Any +from urllib import error as urllib_error +from urllib import request as urllib_request + +logger = logging.getLogger(__name__) + +TOPIC_RE = re.compile(r"^WAKE_SLO_PROBE_(C2L|L2C)_\d+$") +BODY_RE = re.compile( + r"^WAKE_SLO_PING (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z)$" +) + +DEFAULT_API_KEY_FILE = "~/.ductor-corsairmain/.secrets/qoopia_api_key" +DEFAULT_MCP_URL = "http://127.0.0.1:3738/mcp" +MCP_TIMEOUT_SECONDS = 5.0 + + +def is_probe_payload(payload: dict[str, Any]) -> bool: + """Return True iff this webhook payload is a Wake SLO probe.""" + if not isinstance(payload, dict): + return False + if payload.get("event_type") != "agentcomm_wake": + return False + topic = payload.get("topic") + body = payload.get("body") + if not isinstance(topic, str) or not isinstance(body, str): + return False + return bool(TOPIC_RE.match(topic)) and bool(BODY_RE.match(body)) + + +def _read_api_key() -> str | None: + # Resolution order: + # 1. WAKE_SLO_RESPONDER_API_KEY env var (raw value) — explicit override + # so deployments can name the key the responder uses without + # hijacking other env vars. + # 2. WAKE_SLO_RESPONDER_API_KEY_FILE env var (path to a key file). + # 3. Default file path (steward-style key, may not have ack rights + # on messages addressed to the local recipient agent — see the + # contract doc note on per-agent key sourcing). + env_value = os.environ.get("WAKE_SLO_RESPONDER_API_KEY") + if env_value: + return env_value.strip() or None + path_str = os.environ.get( + "WAKE_SLO_RESPONDER_API_KEY_FILE", + os.environ.get("QOOPIA_API_KEY_FILE", DEFAULT_API_KEY_FILE), + ) + try: + return Path(path_str).expanduser().read_text(encoding="utf-8").strip() or None + except OSError as e: + logger.warning("wake_slo_responder: api_key unreadable at %s: %s", path_str, e) + return None + + +def _mcp_url() -> str: + base = os.environ.get("QOOPIA_PUBLIC_URL", "").rstrip("/") + return f"{base}/mcp" if base else DEFAULT_MCP_URL + + +def _mcp_call(method: str, name: str, arguments: dict[str, Any], api_key: str) -> dict[str, Any]: + """One JSON-RPC POST to Qoopia MCP (stateless). Returns parsed result. + + The endpoint emits SSE-framed ``event: message\\ndata: {json}\\n``. + We strip the framing and JSON-parse the data line. + """ + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": {"name": name, "arguments": arguments}, + } + req = urllib_request.Request( + _mcp_url(), + data=json.dumps(payload).encode("utf-8"), + method="POST", + headers={ + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", + "Authorization": f"Bearer {api_key}", + }, + ) + with urllib_request.urlopen(req, timeout=MCP_TIMEOUT_SECONDS) as resp: + raw = resp.read().decode("utf-8") + for line in raw.splitlines(): + if line.startswith("data: "): + return json.loads(line[len("data: "):]) + return json.loads(raw) + + +def respond(payload: dict[str, Any]) -> dict[str, Any]: + """Ack and pong-reply this probe via Qoopia MCP. + + Returns a dict suitable for logging. Always returns; never raises out + to the caller. On error, ``status`` will be a string starting with + ``error:`` so the dispatch can fall through to the normal Claude + session as a safety net. + """ + api_key = _read_api_key() + if not api_key: + return {"status": "error:no_api_key"} + + body = payload.get("body", "") + m = BODY_RE.match(body) + if not m: + return {"status": "error:body_regex_mismatch"} + pong_ts = m.group(1) + + message_id = payload.get("message_id") + session_id = payload.get("session_id") + if not isinstance(message_id, str) or not isinstance(session_id, str): + return {"status": "error:missing_ids"} + + try: + ack_resp = _mcp_call( + "tools/call", + "agent_ack", + {"message_id": message_id, "status": "auto_pong"}, + api_key, + ) + if "error" in ack_resp: + return {"status": "error:ack_failed", "detail": ack_resp["error"]} + + reply_resp = _mcp_call( + "tools/call", + "agent_reply", + { + "session_id": session_id, + "body": f"WAKE_SLO_PONG {pong_ts}", + "close": True, + }, + api_key, + ) + if "error" in reply_resp: + return {"status": "error:reply_failed", "detail": reply_resp["error"]} + except urllib_error.URLError as e: + return {"status": "error:network", "detail": str(e)} + except Exception as e: # noqa: BLE001 — never raise out of the responder + return {"status": "error:exception", "detail": str(e)} + + return { + "status": "ok", + "message_id": message_id, + "session_id": session_id, + "pong_ts": pong_ts, + } diff --git a/ductor_bot/webhook/observer.py b/ductor_bot/webhook/observer.py index 8fb63521..b8647963 100644 --- a/ductor_bot/webhook/observer.py +++ b/ductor_bot/webhook/observer.py @@ -13,6 +13,7 @@ from ductor_bot.infra.file_watcher import FileWatcher from ductor_bot.infra.task_runner import execute_in_task_folder from ductor_bot.utils.quiet_hours import check_quiet_hour +from ductor_bot.wake_slo_responder import is_probe_payload, respond as wake_slo_respond from ductor_bot.webhook.models import WebhookResult, render_template from ductor_bot.webhook.server import WebhookServer @@ -139,6 +140,33 @@ async def _dispatch(self, hook_id: str, payload: dict[str, Any]) -> WebhookResul status="error:not_found", ) + # Wake SLO recipient-side responder: short-circuit probe payloads + # before they cost a Claude session. The responder acks + replies + # via the local Qoopia MCP endpoint. On any error it falls through + # to the normal dispatch path below. + if hook.mode == "wake" and is_probe_payload(payload): + slo_result = wake_slo_respond(payload) + status = slo_result.get("status", "error:no_status") + if status == "ok": + logger.info( + "Wake SLO probe handled hook=%s message_id=%s", + hook_id, + slo_result.get("message_id"), + ) + self._manager.record_trigger(hook_id, error=None) + return WebhookResult( + hook_id=hook_id, + hook_title=hook.title, + mode="wake", + result_text=f"WAKE_SLO_PONG {slo_result.get('pong_ts')}", + status="success:wake_slo_probe", + ) + logger.warning( + "Wake SLO responder errored (%s); falling through to Claude dispatch hook=%s", + status, + hook_id, + ) + rendered = render_template(hook.prompt_template, payload) safe_prompt = f"{_SAFETY_START}\n{rendered}\n{_SAFETY_END}" diff --git a/tests/test_provider_for_claude_full_ids.py b/tests/test_provider_for_claude_full_ids.py new file mode 100644 index 00000000..b8b46ac6 --- /dev/null +++ b/tests/test_provider_for_claude_full_ids.py @@ -0,0 +1,37 @@ +"""Regression tests for ModelRegistry.provider_for full Claude model IDs. + +Claude Code accepts both short aliases (opus/sonnet/haiku) and full Claude +model IDs (claude-opus-4-7, claude-sonnet-4-6, claude-haiku-4-5-20251001). +The registry historically matched only the short aliases, so a full ID fell +through to the codex branch and broke Claude routing for clients that pass +the canonical model name. +""" + +from __future__ import annotations + +import pytest + +from ductor_bot.config import ModelRegistry + + +@pytest.mark.parametrize( + "model_id", + [ + "claude-opus-4-7", + "claude-sonnet-4-6", + "claude-haiku-4-5-20251001", + ], +) +def test_provider_for_full_claude_model_ids(model_id: str) -> None: + assert ModelRegistry.provider_for(model_id) == "claude" + + +def test_provider_for_short_aliases_still_claude() -> None: + assert ModelRegistry.provider_for("opus") == "claude" + assert ModelRegistry.provider_for("sonnet") == "claude" + assert ModelRegistry.provider_for("haiku") == "claude" + + +def test_provider_for_non_claude_unchanged() -> None: + assert ModelRegistry.provider_for("gpt-5.2-codex") == "codex" + assert ModelRegistry.provider_for("gemini-2.5-pro") == "gemini" diff --git a/tests/test_wake_slo_responder.py b/tests/test_wake_slo_responder.py new file mode 100644 index 00000000..4c46c884 --- /dev/null +++ b/tests/test_wake_slo_responder.py @@ -0,0 +1,190 @@ +"""Tests for the recipient-side Wake SLO responder. + +Network calls are mocked; the tests exercise the regex gates, the +fall-through behavior, and the JSON-RPC payload shaping. +""" + +from __future__ import annotations + +import json +from typing import Any +from unittest import mock + +import pytest + +from ductor_bot import wake_slo_responder + + +def _probe_payload(topic: str, body: str, **extra: Any) -> dict[str, Any]: + return { + "event_type": "agentcomm_wake", + "from_agent": "corsair-main", + "to_agent": "corsair-main", + "session_id": "01KSBN258GN4PSMA0679ST6CZV", + "message_id": "01KSBN258H6CM8Z92FWBY4D9AA", + "topic": topic, + "body": body, + "created_at": "2026-05-24T00:30:00Z", + **extra, + } + + +def test_is_probe_payload_matches_canonical() -> None: + p = _probe_payload( + "WAKE_SLO_PROBE_C2L_1779582600000", + "WAKE_SLO_PING 2026-05-24T00:30:00.000Z", + ) + assert wake_slo_responder.is_probe_payload(p) is True + + +def test_is_probe_payload_rejects_non_wake_event_type() -> None: + p = _probe_payload( + "WAKE_SLO_PROBE_C2L_1779582600000", + "WAKE_SLO_PING 2026-05-24T00:30:00.000Z", + ) + p["event_type"] = "something_else" + assert wake_slo_responder.is_probe_payload(p) is False + + +def test_is_probe_payload_rejects_non_probe_topic() -> None: + p = _probe_payload( + "SMOKE_unrelated", + "WAKE_SLO_PING 2026-05-24T00:30:00.000Z", + ) + assert wake_slo_responder.is_probe_payload(p) is False + + +def test_is_probe_payload_rejects_non_probe_body() -> None: + p = _probe_payload( + "WAKE_SLO_PROBE_C2L_1779582600000", + "HELLO", + ) + assert wake_slo_responder.is_probe_payload(p) is False + + +def test_respond_no_api_key_returns_error(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(wake_slo_responder, "_read_api_key", lambda: None) + out = wake_slo_responder.respond( + _probe_payload( + "WAKE_SLO_PROBE_C2L_1779582600000", + "WAKE_SLO_PING 2026-05-24T00:30:00.000Z", + ) + ) + assert out == {"status": "error:no_api_key"} + + +def test_respond_calls_ack_then_reply(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(wake_slo_responder, "_read_api_key", lambda: "fake-key") + + calls: list[tuple[str, str, dict[str, Any], str]] = [] + + def fake_mcp_call( + method: str, name: str, arguments: dict[str, Any], api_key: str + ) -> dict[str, Any]: + calls.append((method, name, arguments, api_key)) + if name == "agent_ack": + return {"jsonrpc": "2.0", "id": 1, "result": {"acked_at": "..."}} + if name == "agent_reply": + return {"jsonrpc": "2.0", "id": 1, "result": {"closed": True}} + raise AssertionError(f"unexpected tool call: {name}") + + monkeypatch.setattr(wake_slo_responder, "_mcp_call", fake_mcp_call) + + out = wake_slo_responder.respond( + _probe_payload( + "WAKE_SLO_PROBE_L2C_1779582600000", + "WAKE_SLO_PING 2026-05-24T00:30:00.000Z", + ) + ) + assert out["status"] == "ok" + assert out["pong_ts"] == "2026-05-24T00:30:00.000Z" + assert len(calls) == 2 + assert calls[0][1] == "agent_ack" + assert calls[0][2] == { + "message_id": "01KSBN258H6CM8Z92FWBY4D9AA", + "status": "auto_pong", + } + assert calls[1][1] == "agent_reply" + assert calls[1][2] == { + "session_id": "01KSBN258GN4PSMA0679ST6CZV", + "body": "WAKE_SLO_PONG 2026-05-24T00:30:00.000Z", + "close": True, + } + + +def test_respond_ack_failure_short_circuits(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(wake_slo_responder, "_read_api_key", lambda: "fake-key") + + def fake_mcp_call( + method: str, name: str, arguments: dict[str, Any], api_key: str + ) -> dict[str, Any]: + if name == "agent_ack": + return {"error": {"code": -32603, "message": "boom"}} + raise AssertionError(f"should not reach: {name}") + + monkeypatch.setattr(wake_slo_responder, "_mcp_call", fake_mcp_call) + + out = wake_slo_responder.respond( + _probe_payload( + "WAKE_SLO_PROBE_C2L_1779582600000", + "WAKE_SLO_PING 2026-05-24T00:30:00.000Z", + ) + ) + assert out["status"] == "error:ack_failed" + + +def test_respond_network_error_returns_error_status( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(wake_slo_responder, "_read_api_key", lambda: "fake-key") + + def fake_mcp_call(*args: Any, **kwargs: Any) -> dict[str, Any]: + from urllib import error as urllib_error + + raise urllib_error.URLError("connection refused") + + monkeypatch.setattr(wake_slo_responder, "_mcp_call", fake_mcp_call) + + out = wake_slo_responder.respond( + _probe_payload( + "WAKE_SLO_PROBE_C2L_1779582600000", + "WAKE_SLO_PING 2026-05-24T00:30:00.000Z", + ) + ) + assert out["status"] == "error:network" + assert "connection refused" in out["detail"] + + +def test_mcp_call_parses_sse_framed_response(monkeypatch: pytest.MonkeyPatch) -> None: + sse_body = ( + b"event: message\n" + b'data: {"jsonrpc":"2.0","id":1,"result":{"ok":true}}\n' + b"\n" + ) + + class FakeResponse: + def __init__(self, body: bytes) -> None: + self._body = body + + def read(self) -> bytes: + return self._body + + def __enter__(self) -> "FakeResponse": + return self + + def __exit__(self, *args: Any) -> None: + pass + + fake_urlopen = mock.Mock(return_value=FakeResponse(sse_body)) + monkeypatch.setattr(wake_slo_responder.urllib_request, "urlopen", fake_urlopen) + + parsed = wake_slo_responder._mcp_call( + "tools/call", "agent_status", {}, "fake-key" + ) + assert parsed["result"] == {"ok": True} + fake_urlopen.assert_called_once() + req = fake_urlopen.call_args.args[0] + assert req.method == "POST" + sent = json.loads(req.data.decode()) + assert sent["method"] == "tools/call" + assert sent["params"]["name"] == "agent_status"