diff --git a/Gradata/src/gradata/daemon.py b/Gradata/src/gradata/daemon.py index 0b441176..f9a3ac78 100644 --- a/Gradata/src/gradata/daemon.py +++ b/Gradata/src/gradata/daemon.py @@ -16,6 +16,10 @@ POST /tag-delta — semantic tagging of file changes POST /checkpoint — save learning state before context compaction POST /maintain — brain maintenance tasks + GET /mcp/tools — list MCP tool schemas (for stdio bridge) + POST /mcp/tool-call — dispatch a single MCP tool call against the + daemon's in-memory Brain (no extra flock — the + stdio MCP server uses this to avoid contention) Usage: python -m gradata.daemon --brain-dir ./my-brain @@ -128,6 +132,8 @@ def log_message(self, format: str, *args: object) -> None: def do_GET(self) -> None: if self.path == "/health": self._handle_health() + elif self.path == "/mcp/tools": + self._handle_mcp_tools_list() else: self._not_found() @@ -143,6 +149,7 @@ def do_POST(self) -> None: "/tag-delta": self._handle_tag_delta, "/checkpoint": self._handle_checkpoint, "/maintain": self._handle_maintain, + "/mcp/tool-call": self._handle_mcp_tool_call, } handler = routes.get(self.path) if handler: @@ -635,6 +642,42 @@ def _handle_maintain(self) -> None: } ) + # ── MCP bridge endpoints ──────────────────────────────────────────── + # + # These let `gradata.mcp_server` (the stdio MCP transport) act as a thin + # bridge to this daemon — same brain, same flock, just a different + # client-facing wire format. See gradata/mcp_server.py for the client side. + + def _handle_mcp_tools_list(self) -> None: + self.daemon._reset_idle_timer() + try: + from gradata.mcp_server import _TOOL_SCHEMAS + except Exception as exc: # noqa: BLE001 — surface to client + self._send_json({"error": f"mcp_server unavailable: {exc}"}, 500) + return + self._send_json({"tools": _TOOL_SCHEMAS}) + + def _handle_mcp_tool_call(self) -> None: + self.daemon._reset_idle_timer() + body = self._read_json() + tool_name = body.get("name", "") + arguments = body.get("arguments") or {} + if not isinstance(arguments, dict): + self._send_json({"error": "arguments must be an object"}, 400) + return + + try: + from gradata.mcp_server import _dispatch + except Exception as exc: # noqa: BLE001 + self._send_json({"error": f"mcp_server unavailable: {exc}"}, 500) + return + + d = self.daemon + # _dispatch() touches the Brain; serialize against other daemon work. + with d._brain_lock: + result = _dispatch(d._brain, tool_name, arguments) + self._send_json(result) + # ── Main daemon class ────────────────────────────────────────────────── @@ -752,6 +795,17 @@ def start(self) -> None: if self._pid_file: _write_pid_file(self._pid_file, actual_port, self._brain_dir, self._started_at) + # Always advertise the daemon inside the brain dir so the stdio + # MCP bridge (and any other local client) can discover us without + # needing an explicit --pid-file. Best-effort; failures are non-fatal. + with contextlib.suppress(OSError): + _write_pid_file( + self._brain_dir / ".daemon.json", + actual_port, + self._brain_dir, + self._started_at, + ) + # SIGTERM handler _register_signal_handler(self) @@ -805,6 +859,11 @@ def _cleanup(self) -> None: if self._pid_file and self._pid_file.exists(): with contextlib.suppress(OSError): self._pid_file.unlink() + # Best-effort: clear our auto-advertised daemon.json on shutdown. + advert = self._brain_dir / ".daemon.json" + if advert.exists(): + with contextlib.suppress(OSError): + advert.unlink() if self._process_lock_cm is not None: self._process_lock_cm.__exit__(None, None, None) self._process_lock_cm = None diff --git a/Gradata/src/gradata/mcp_server.py b/Gradata/src/gradata/mcp_server.py index b224d5ac..a9a749a6 100644 --- a/Gradata/src/gradata/mcp_server.py +++ b/Gradata/src/gradata/mcp_server.py @@ -33,6 +33,8 @@ import contextlib import json import sys +import urllib.error +import urllib.request from pathlib import Path from typing import TYPE_CHECKING, Any @@ -57,6 +59,116 @@ SERVER_VERSION = "0.1.0" PROTOCOL_VERSION = "2024-11-05" +# How long to wait on the daemon for a single tool call. MCP clients +# already have their own timeouts; keep this generous so slow brain +# operations (search, benchmark) don't get cut off. +_DAEMON_RPC_TIMEOUT = 60.0 +_DAEMON_PROBE_TIMEOUT = 1.0 + + +# --------------------------------------------------------------------------- +# Daemon bridge — when a local gradata daemon is running, the MCP stdio +# server delegates tool calls over HTTP instead of opening the brain itself. +# That keeps a single process (the daemon) as the sole flock holder. +# --------------------------------------------------------------------------- + + +class _DaemonClient: + """Thin HTTP client for the local gradata daemon's /mcp/* endpoints.""" + + def __init__(self, base_url: str) -> None: + self.base_url = base_url.rstrip("/") + + @classmethod + def discover(cls, brain_dir: Path | None) -> _DaemonClient | None: + """Locate a running daemon for *brain_dir*; return a client or None. + + Discovery order: + 1. $GRADATA_DAEMON_URL env var (full base URL, e.g. http://127.0.0.1:8765) + 2. $GRADATA_DAEMON_PORT env var on 127.0.0.1 + 3. /.daemon.json written by daemon.start() + 4. The conventional 127.0.0.1:8765 port (legacy / dashboard default) + + Returns a client only if /health responds OK within _DAEMON_PROBE_TIMEOUT. + """ + import os + + candidates: list[str] = [] + + env_url = os.environ.get("GRADATA_DAEMON_URL") + if env_url: + candidates.append(env_url) + + env_port = os.environ.get("GRADATA_DAEMON_PORT") + if env_port and env_port.isdigit(): + candidates.append(f"http://127.0.0.1:{env_port}") + + if brain_dir is not None: + advert = Path(brain_dir) / ".daemon.json" + if advert.exists(): + try: + info = json.loads(advert.read_text(encoding="utf-8")) + port = int(info.get("port", 0)) + if port: + candidates.append(f"http://127.0.0.1:{port}") + except (OSError, ValueError, json.JSONDecodeError): + pass + + # Last-resort: the documented HTTP-daemon port. Lets the bridge + # find an existing daemon even if .daemon.json hasn't been written + # yet (older daemon, fresh install, etc.). + candidates.append("http://127.0.0.1:8765") + + seen: set[str] = set() + for url in candidates: + url = url.rstrip("/") + if url in seen: + continue + seen.add(url) + if cls._probe(url): + _log.info("MCP bridge: connected to gradata daemon at %s", url) + return cls(url) + return None + + @staticmethod + def _probe(base_url: str) -> bool: + try: + req = urllib.request.Request(f"{base_url}/health", method="GET") + with urllib.request.urlopen(req, timeout=_DAEMON_PROBE_TIMEOUT) as resp: + return 200 <= resp.status < 300 + except (urllib.error.URLError, OSError, ValueError): + return False + + def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]: + """POST a tool call to the daemon's /mcp/tool-call endpoint.""" + payload = json.dumps({"name": tool_name, "arguments": arguments}).encode("utf-8") + req = urllib.request.Request( + f"{self.base_url}/mcp/tool-call", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=_DAEMON_RPC_TIMEOUT) as resp: + body = resp.read() + except urllib.error.HTTPError as exc: + # Daemon answered with a non-2xx — treat as a tool-level error. + try: + body = exc.read() + data = json.loads(body.decode("utf-8")) + except Exception: + data = {"error": f"daemon HTTP {exc.code}: {exc.reason}"} + return data if isinstance(data, dict) else {"error": str(data)} + except (urllib.error.URLError, OSError) as exc: + return {"error": f"daemon unreachable: {exc}"} + + try: + data = json.loads(body.decode("utf-8")) + except json.JSONDecodeError as exc: + return {"error": f"daemon returned invalid JSON: {exc}"} + return data if isinstance(data, dict) else {"error": "daemon returned non-object"} + + # --------------------------------------------------------------------------- # Framing helpers # --------------------------------------------------------------------------- @@ -528,14 +640,27 @@ def _handle_tools_list(req_id: Any) -> dict[str, Any]: return _ok(req_id, {"tools": _TOOL_SCHEMAS}) -def _handle_tools_call(req_id: Any, params: dict[str, Any], brain: Any) -> dict[str, Any]: - """Dispatch a tool call and wrap the result.""" +def _handle_tools_call( + req_id: Any, + params: dict[str, Any], + brain: Any, + daemon_client: _DaemonClient | None = None, +) -> dict[str, Any]: + """Dispatch a tool call and wrap the result. + + When *daemon_client* is provided, the tool call is forwarded to the + running daemon over HTTP (no local Brain access). Otherwise it falls + back to dispatching against the in-process *brain*. + """ tool_name = params.get("name", "") arguments = params.get("arguments") or {} if not isinstance(arguments, dict): return _err(req_id, INVALID_PARAMS, "arguments must be an object") - result = _dispatch(brain, tool_name, arguments) + if daemon_client is not None: + result = daemon_client.call_tool(tool_name, arguments) + else: + result = _dispatch(brain, tool_name, arguments) if "error" in result and "content" not in result: # Tool-level error — still a successful RPC, but isError=true per MCP spec return _ok( @@ -558,7 +683,14 @@ def _handle_ping(req_id: Any) -> dict[str, Any]: # --------------------------------------------------------------------------- -def run_server(brain_dir: str | Path | None, *, stdin=None, stdout=None) -> None: +def run_server( + brain_dir: str | Path | None, + *, + stdin=None, + stdout=None, + daemon_client: _DaemonClient | None = None, + use_daemon: bool = True, +) -> None: """Run the MCP stdio server until the client sends shutdown or EOF. Args: @@ -566,6 +698,11 @@ def run_server(brain_dir: str | Path | None, *, stdin=None, stdout=None) -> None without a brain and returns errors for tool calls. stdin: Readable binary stream (defaults to sys.stdin.buffer). stdout: Writable binary stream (defaults to sys.stdout.buffer). + daemon_client: Optional pre-built bridge client (mainly for tests). + use_daemon: When True (default), try to bridge to a local daemon + over HTTP before falling back to opening the Brain in + this process. Set False to force the legacy in-process + behaviour (e.g. tests that mock Brain directly). """ in_stream: io.RawIOBase = stdin or sys.stdin.buffer # type: ignore[assignment] out_stream: io.RawIOBase = stdout or sys.stdout.buffer # type: ignore[assignment] @@ -579,16 +716,22 @@ def run_server(brain_dir: str | Path | None, *, stdin=None, stdout=None) -> None # Default: ~/.gradata/brain brain_dir = str(Path.home() / ".gradata" / "brain") + brain_path = Path(brain_dir) if brain_dir is not None else None + + # Prefer the daemon-bridge transport: if a daemon is already running for + # this brain, talk to it over HTTP and never grab the flock ourselves. + if daemon_client is None and use_daemon: + daemon_client = _DaemonClient.discover(brain_path) + # Instantiate Brain from the module-level import (patchable in tests). # Auto-initialize if the directory doesn't exist (zero-friction first run). brain: Any = None lock_cm = None - if brain_dir is not None: + if daemon_client is None and brain_dir is not None: try: if Brain is None: raise ImportError("gradata.brain.Brain could not be imported") - brain_path = Path(brain_dir) - if not brain_path.exists(): + if brain_path is not None and not brain_path.exists(): _log.info("Auto-initializing brain at %s", brain_dir) brain = Brain.init(brain_dir, domain="General") else: @@ -633,7 +776,7 @@ def run_server(brain_dir: str | Path | None, *, stdin=None, stdout=None) -> None response = _handle_tools_list(req_id) elif method == "tools/call": - response = _handle_tools_call(req_id, params, brain) + response = _handle_tools_call(req_id, params, brain, daemon_client) elif method == "shutdown": if not is_notification: @@ -669,6 +812,15 @@ def main() -> None: metavar="PATH", help="Path to the brain directory (default: $BRAIN_DIR env var)", ) + parser.add_argument( + "--no-daemon", + action="store_true", + help=( + "Disable HTTP-bridge mode and always open the brain in-process. " + "By default the server delegates tool calls to a local gradata " + "daemon if one is running (recommended — avoids flock contention)." + ), + ) args = parser.parse_args() brain_dir: str | None = args.brain_dir @@ -677,7 +829,7 @@ def main() -> None: brain_dir = os.environ.get("BRAIN_DIR") - run_server(brain_dir) + run_server(brain_dir, use_daemon=not args.no_daemon) if __name__ == "__main__": diff --git a/Gradata/tests/test_mcp_daemon_bridge.py b/Gradata/tests/test_mcp_daemon_bridge.py new file mode 100644 index 00000000..7736d5f8 --- /dev/null +++ b/Gradata/tests/test_mcp_daemon_bridge.py @@ -0,0 +1,176 @@ +""" +Tests for the MCP stdio ⇄ HTTP-daemon bridge. + +Covers: +- `_DaemonClient.discover` finds the daemon via .daemon.json +- `_DaemonClient.discover` returns None when no daemon is reachable +- `run_server` in bridge mode does NOT open the Brain (no flock contention) +- New daemon endpoints /mcp/tools and /mcp/tool-call work end-to-end +""" + +from __future__ import annotations + +import io +import json +import threading +import urllib.request +from typing import TYPE_CHECKING, Any +from unittest.mock import patch + +import pytest + +from gradata.daemon import GradataDaemon +from gradata.mcp_server import _DaemonClient, run_server + +if TYPE_CHECKING: + from pathlib import Path + + +def _frame(obj: dict[str, Any]) -> bytes: + body = json.dumps(obj).encode("utf-8") + return f"Content-Length: {len(body)}\r\n\r\n".encode() + body + + +def _read_all(buf: io.BytesIO) -> list[dict[str, Any]]: + buf.seek(0) + raw = buf.read() + responses: list[dict[str, Any]] = [] + while raw: + if b"Content-Length:" not in raw: + break + header_end = raw.find(b"\r\n\r\n") + if header_end == -1: + break + headers = raw[:header_end].decode() + cl = 0 + for line in headers.split("\r\n"): + if line.lower().startswith("content-length:"): + cl = int(line.split(":", 1)[1].strip()) + body_start = header_end + 4 + body = raw[body_start : body_start + cl] + responses.append(json.loads(body)) + raw = raw[body_start + cl :] + return responses + + +@pytest.fixture +def live_daemon(brain_dir: Path): + """Spin up a real GradataDaemon in a thread, yield (daemon, base_url).""" + d = GradataDaemon(brain_dir, port=0) + d._try_bind(0) + assert d._server is not None + d._server._daemon = d # type: ignore[attr-defined] + port = d._server.server_address[1] + d._port = port + d._reset_idle_timer() + + # Mirror what start() writes so discovery via .daemon.json works. + from datetime import UTC, datetime + + from gradata.daemon import _write_pid_file + + _write_pid_file( + brain_dir / ".daemon.json", + port, + brain_dir, + datetime.now(UTC).isoformat(), + ) + + t = threading.Thread(target=d._server.serve_forever, daemon=True) + t.start() + try: + yield d, f"http://127.0.0.1:{port}" + finally: + d._server.shutdown() + advert = brain_dir / ".daemon.json" + if advert.exists(): + advert.unlink() + + +# ── _DaemonClient.discover ─────────────────────────────────────────────── + + +def test_discover_returns_none_when_no_daemon(tmp_path: Path) -> None: + """No .daemon.json and no listener — discovery must return None.""" + # We can't fully isolate from a stray 127.0.0.1:8765 listener on the host, + # but the explicit env vars + missing advert file means probe sequence is: + # only the legacy 8765 fallback. We'll skip if that happens to answer. + with patch.dict("os.environ", {}, clear=False): + for k in ("GRADATA_DAEMON_URL", "GRADATA_DAEMON_PORT"): + __import__("os").environ.pop(k, None) + client = _DaemonClient.discover(tmp_path / "no-such-brain") + if client is not None and client.base_url.endswith(":8765"): + pytest.skip("Host has a daemon listening on 8765; can't assert no-discovery") + assert client is None + + +def test_discover_finds_daemon_via_advert_file(live_daemon, brain_dir: Path) -> None: + """A daemon advertising itself in /.daemon.json must be discovered.""" + _d, base = live_daemon + client = _DaemonClient.discover(brain_dir) + assert client is not None + # The brain-dir-advertised port should win over the 8765 fallback. + assert client.base_url == base + + +# ── /mcp/tools and /mcp/tool-call endpoints ────────────────────────────── + + +def test_daemon_mcp_tools_endpoint_lists_schemas(live_daemon) -> None: + _d, base = live_daemon + with urllib.request.urlopen(f"{base}/mcp/tools", timeout=5) as resp: + data = json.loads(resp.read()) + assert "tools" in data + names = {t["name"] for t in data["tools"]} + assert "brain_health" in names + assert "brain_search" in names + + +def test_daemon_mcp_tool_call_endpoint_dispatches(live_daemon) -> None: + _d, base = live_daemon + payload = json.dumps({"name": "brain_health", "arguments": {}}).encode() + req = urllib.request.Request( + f"{base}/mcp/tool-call", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read()) + # Either tool produces content or surfaces an error; both are dicts. + assert isinstance(data, dict) + assert "content" in data or "error" in data + + +# ── Bridge mode: run_server delegates and never opens Brain ────────────── + + +def test_run_server_in_bridge_mode_does_not_open_brain(live_daemon, brain_dir: Path) -> None: + """When a daemon is discoverable, the stdio server must not instantiate Brain.""" + _d, _base = live_daemon + + in_buf = io.BytesIO( + _frame({"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}}) + + _frame( + { + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": {"name": "brain_health", "arguments": {}}, + } + ) + + _frame({"jsonrpc": "2.0", "id": 3, "method": "shutdown"}) + ) + out_buf = io.BytesIO() + + # If the bridge engages, gradata.mcp_server.Brain must not be invoked. + with patch("gradata.mcp_server.Brain") as MockBrain: + run_server(brain_dir, stdin=in_buf, stdout=out_buf) + MockBrain.assert_not_called() + + responses = _read_all(out_buf) + ids = [r.get("id") for r in responses] + assert 1 in ids and 2 in ids + call = next(r for r in responses if r.get("id") == 2) + # Tool call should have produced an MCP-shaped result via the daemon. + assert "result" in call diff --git a/Gradata/tests/test_mcp_server.py b/Gradata/tests/test_mcp_server.py index f0dc9a1e..c21f836e 100644 --- a/Gradata/tests/test_mcp_server.py +++ b/Gradata/tests/test_mcp_server.py @@ -347,7 +347,7 @@ def _run(self, *messages: dict[str, Any], brain_dir: Any = None) -> list[dict[st mock_brain.log_output.return_value = {"ts": "t", "type": "OUTPUT", "source": "s"} with patch("gradata.mcp_server.Brain", return_value=mock_brain): - run_server(brain_dir or "/fake/brain", stdin=in_buf, stdout=out_buf) + run_server(brain_dir or "/fake/brain", stdin=in_buf, stdout=out_buf, use_daemon=False) return _read_all_responses(out_buf) @@ -425,7 +425,7 @@ def test_no_brain_dir_returns_error_on_tool_call(self): ) out_buf = io.BytesIO() # Do NOT patch Brain — with brain_dir=None it won't try to instantiate - run_server(None, stdin=in_buf, stdout=out_buf) + run_server(None, stdin=in_buf, stdout=out_buf, use_daemon=False) responses = _read_all_responses(out_buf) call_resp = next(r for r in responses if r.get("id") == 2) # isError=true or the result content contains the error string