Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions Gradata/src/gradata/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
POST /tag-delta — semantic tagging of file changes
POST /checkpoint — save learning state before context compaction
POST /maintain — brain maintenance tasks
GET /mcp/tools — list MCP tool schemas (for stdio bridge)
POST /mcp/tool-call — dispatch a single MCP tool call against the
daemon's in-memory Brain (no extra flock — the
stdio MCP server uses this to avoid contention)

Usage:
python -m gradata.daemon --brain-dir ./my-brain
Expand Down Expand Up @@ -128,6 +132,8 @@ def log_message(self, format: str, *args: object) -> None:
def do_GET(self) -> None:
if self.path == "/health":
self._handle_health()
elif self.path == "/mcp/tools":
self._handle_mcp_tools_list()
else:
self._not_found()

Expand All @@ -143,6 +149,7 @@ def do_POST(self) -> None:
"/tag-delta": self._handle_tag_delta,
"/checkpoint": self._handle_checkpoint,
"/maintain": self._handle_maintain,
"/mcp/tool-call": self._handle_mcp_tool_call,
}
handler = routes.get(self.path)
if handler:
Expand Down Expand Up @@ -635,6 +642,42 @@ def _handle_maintain(self) -> None:
}
)

# ── MCP bridge endpoints ────────────────────────────────────────────
#
# These let `gradata.mcp_server` (the stdio MCP transport) act as a thin
# bridge to this daemon — same brain, same flock, just a different
# client-facing wire format. See gradata/mcp_server.py for the client side.

def _handle_mcp_tools_list(self) -> None:
self.daemon._reset_idle_timer()
try:
from gradata.mcp_server import _TOOL_SCHEMAS
except Exception as exc: # noqa: BLE001 — surface to client
self._send_json({"error": f"mcp_server unavailable: {exc}"}, 500)
return
self._send_json({"tools": _TOOL_SCHEMAS})

def _handle_mcp_tool_call(self) -> None:
self.daemon._reset_idle_timer()
body = self._read_json()
tool_name = body.get("name", "")
arguments = body.get("arguments") or {}
if not isinstance(arguments, dict):
self._send_json({"error": "arguments must be an object"}, 400)
Comment on lines +662 to +666
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject non-object request bodies before calling .get().

json.loads() can return a list/string/number. In that case Line 663 raises AttributeError, and this endpoint drops the request instead of returning a 400.

Suggested fix
     def _handle_mcp_tool_call(self) -> None:
         self.daemon._reset_idle_timer()
         body = self._read_json()
+        if not isinstance(body, dict):
+            self._send_json({"error": "request body must be an object"}, 400)
+            return
         tool_name = body.get("name", "")
         arguments = body.get("arguments") or {}
         if not isinstance(arguments, dict):
             self._send_json({"error": "arguments must be an object"}, 400)
             return
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/daemon.py` around lines 662 - 666, The endpoint currently
assumes the parsed JSON from self._read_json() is a dict and calls
body.get(...), which raises AttributeError for non-object JSON; modify the
handler to first check that body is an instance of dict (e.g., if not
isinstance(body, dict): self._send_json({"error":"request body must be an
object"}, 400) and return) before accessing body.get("name") and
body.get("arguments"), and keep using _send_json to return the 400 error for
invalid bodies.

return

try:
from gradata.mcp_server import _dispatch
except Exception as exc: # noqa: BLE001
self._send_json({"error": f"mcp_server unavailable: {exc}"}, 500)
return

d = self.daemon
# _dispatch() touches the Brain; serialize against other daemon work.
with d._brain_lock:
result = _dispatch(d._brain, tool_name, arguments)
self._send_json(result)


# ── Main daemon class ──────────────────────────────────────────────────

Expand Down Expand Up @@ -752,6 +795,17 @@ def start(self) -> None:
if self._pid_file:
_write_pid_file(self._pid_file, actual_port, self._brain_dir, self._started_at)

# Always advertise the daemon inside the brain dir so the stdio
# MCP bridge (and any other local client) can discover us without
# needing an explicit --pid-file. Best-effort; failures are non-fatal.
with contextlib.suppress(OSError):
_write_pid_file(
self._brain_dir / ".daemon.json",
actual_port,
self._brain_dir,
self._started_at,
)
Comment on lines +798 to +807
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Write .daemon.json atomically.

This advert is now part of the discovery path. A mid-write crash can leave malformed JSON behind, causing the bridge to miss the right daemon or fall through to another candidate. Please route the .daemon.json write through the repo’s atomic JSON helper instead of _write_pid_file().

As per coding guidelines, "Use atomic-write helper when writing JSON files to prevent corruption from mid-write crashes".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/daemon.py` around lines 798 - 807, Replace the direct
call to _write_pid_file(...) that writes self._brain_dir / ".daemon.json" with
the repository's atomic JSON write helper so the file is written atomically;
call the helper to write the same payload (port, pid/process info and
started_at) to self._brain_dir / ".daemon.json" instead of using
_write_pid_file, using the same inputs (actual_port and self._started_at) and
keep the contextlib.suppress(OSError) wrapper around the atomic write to
preserve best-effort, non-fatal behavior.


# SIGTERM handler
_register_signal_handler(self)

Expand Down Expand Up @@ -805,6 +859,11 @@ def _cleanup(self) -> None:
if self._pid_file and self._pid_file.exists():
with contextlib.suppress(OSError):
self._pid_file.unlink()
# Best-effort: clear our auto-advertised daemon.json on shutdown.
advert = self._brain_dir / ".daemon.json"
if advert.exists():
with contextlib.suppress(OSError):
advert.unlink()
if self._process_lock_cm is not None:
self._process_lock_cm.__exit__(None, None, None)
self._process_lock_cm = None
Expand Down
170 changes: 161 additions & 9 deletions Gradata/src/gradata/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import contextlib
import json
import sys
import urllib.error
import urllib.request
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand All @@ -57,6 +59,116 @@
SERVER_VERSION = "0.1.0"
PROTOCOL_VERSION = "2024-11-05"

# How long to wait on the daemon for a single tool call. MCP clients
# already have their own timeouts; keep this generous so slow brain
# operations (search, benchmark) don't get cut off.
_DAEMON_RPC_TIMEOUT = 60.0
_DAEMON_PROBE_TIMEOUT = 1.0


# ---------------------------------------------------------------------------
# Daemon bridge — when a local gradata daemon is running, the MCP stdio
# server delegates tool calls over HTTP instead of opening the brain itself.
# That keeps a single process (the daemon) as the sole flock holder.
# ---------------------------------------------------------------------------


class _DaemonClient:
"""Thin HTTP client for the local gradata daemon's /mcp/* endpoints."""

def __init__(self, base_url: str) -> None:
self.base_url = base_url.rstrip("/")

@classmethod
def discover(cls, brain_dir: Path | None) -> _DaemonClient | None:
"""Locate a running daemon for *brain_dir*; return a client or None.

Discovery order:
1. $GRADATA_DAEMON_URL env var (full base URL, e.g. http://127.0.0.1:8765)
2. $GRADATA_DAEMON_PORT env var on 127.0.0.1
3. <brain_dir>/.daemon.json written by daemon.start()
4. The conventional 127.0.0.1:8765 port (legacy / dashboard default)

Returns a client only if /health responds OK within _DAEMON_PROBE_TIMEOUT.
"""
import os

candidates: list[str] = []

env_url = os.environ.get("GRADATA_DAEMON_URL")
if env_url:
candidates.append(env_url)

env_port = os.environ.get("GRADATA_DAEMON_PORT")
if env_port and env_port.isdigit():
candidates.append(f"http://127.0.0.1:{env_port}")

if brain_dir is not None:
advert = Path(brain_dir) / ".daemon.json"
if advert.exists():
try:
info = json.loads(advert.read_text(encoding="utf-8"))
port = int(info.get("port", 0))
if port:
candidates.append(f"http://127.0.0.1:{port}")
except (OSError, ValueError, json.JSONDecodeError):
pass

# Last-resort: the documented HTTP-daemon port. Lets the bridge
# find an existing daemon even if .daemon.json hasn't been written
# yet (older daemon, fresh install, etc.).
candidates.append("http://127.0.0.1:8765")

seen: set[str] = set()
for url in candidates:
url = url.rstrip("/")
if url in seen:
continue
seen.add(url)
if cls._probe(url):
_log.info("MCP bridge: connected to gradata daemon at %s", url)
return cls(url)
Comment on lines +123 to +130
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate health["brain_dir"] before accepting a daemon candidate.

Line 128 treats any daemon that answers /health as compatible. If the caller asked for brain B while brain A is listening via GRADATA_DAEMON_URL, GRADATA_DAEMON_PORT, or the :8765 fallback, mutating tools will be forwarded into the wrong brain. Please compare the daemon’s reported brain_dir with the requested brain_dir before returning a client, and only allow cross-brain forwarding via an explicit override.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/mcp_server.py` around lines 123 - 130, The loop currently
accepts any daemon that answers /health; change the probe-and-accept logic to
verify the daemon’s reported brain_dir matches the requested brain before
returning a client: modify or overload cls._probe(url) to return the health dict
(or add a new method like cls._probe_health(url)), then in the candidates loop
compare health.get("brain_dir") to the requested_brain_dir parameter (or an
expected_brain_dir variable passed into the caller) and only do _log.info(...)
and return cls(url) when they match; allow returning a client for mismatched
brain_dir only when an explicit override flag (e.g., allow_cross_brain or an env
var like GRADATA_DAEMON_ALLOW_CROSS_BRAIN) is set and documented.

return None

@staticmethod
def _probe(base_url: str) -> bool:
try:
req = urllib.request.Request(f"{base_url}/health", method="GET")
with urllib.request.urlopen(req, timeout=_DAEMON_PROBE_TIMEOUT) as resp:
return 200 <= resp.status < 300
except (urllib.error.URLError, OSError, ValueError):
return False

def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
"""POST a tool call to the daemon's /mcp/tool-call endpoint."""
payload = json.dumps({"name": tool_name, "arguments": arguments}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/mcp/tool-call",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=_DAEMON_RPC_TIMEOUT) as resp:
body = resp.read()
except urllib.error.HTTPError as exc:
# Daemon answered with a non-2xx — treat as a tool-level error.
try:
body = exc.read()
data = json.loads(body.decode("utf-8"))
except Exception:
data = {"error": f"daemon HTTP {exc.code}: {exc.reason}"}
return data if isinstance(data, dict) else {"error": str(data)}
except (urllib.error.URLError, OSError) as exc:
return {"error": f"daemon unreachable: {exc}"}

try:
data = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as exc:
return {"error": f"daemon returned invalid JSON: {exc}"}
return data if isinstance(data, dict) else {"error": "daemon returned non-object"}


# ---------------------------------------------------------------------------
# Framing helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -528,14 +640,27 @@ def _handle_tools_list(req_id: Any) -> dict[str, Any]:
return _ok(req_id, {"tools": _TOOL_SCHEMAS})


def _handle_tools_call(req_id: Any, params: dict[str, Any], brain: Any) -> dict[str, Any]:
"""Dispatch a tool call and wrap the result."""
def _handle_tools_call(
req_id: Any,
params: dict[str, Any],
brain: Any,
daemon_client: _DaemonClient | None = None,
) -> dict[str, Any]:
"""Dispatch a tool call and wrap the result.

When *daemon_client* is provided, the tool call is forwarded to the
running daemon over HTTP (no local Brain access). Otherwise it falls
back to dispatching against the in-process *brain*.
"""
tool_name = params.get("name", "")
arguments = params.get("arguments") or {}
if not isinstance(arguments, dict):
return _err(req_id, INVALID_PARAMS, "arguments must be an object")

result = _dispatch(brain, tool_name, arguments)
if daemon_client is not None:
result = daemon_client.call_tool(tool_name, arguments)
else:
result = _dispatch(brain, tool_name, arguments)
if "error" in result and "content" not in result:
# Tool-level error — still a successful RPC, but isError=true per MCP spec
return _ok(
Expand All @@ -558,14 +683,26 @@ def _handle_ping(req_id: Any) -> dict[str, Any]:
# ---------------------------------------------------------------------------


def run_server(brain_dir: str | Path | None, *, stdin=None, stdout=None) -> None:
def run_server(
brain_dir: str | Path | None,
*,
stdin=None,
stdout=None,
daemon_client: _DaemonClient | None = None,
use_daemon: bool = True,
) -> None:
"""Run the MCP stdio server until the client sends shutdown or EOF.

Args:
brain_dir: Path to the brain directory. If None the server starts
without a brain and returns errors for tool calls.
stdin: Readable binary stream (defaults to sys.stdin.buffer).
stdout: Writable binary stream (defaults to sys.stdout.buffer).
daemon_client: Optional pre-built bridge client (mainly for tests).
use_daemon: When True (default), try to bridge to a local daemon
over HTTP before falling back to opening the Brain in
this process. Set False to force the legacy in-process
behaviour (e.g. tests that mock Brain directly).
"""
in_stream: io.RawIOBase = stdin or sys.stdin.buffer # type: ignore[assignment]
out_stream: io.RawIOBase = stdout or sys.stdout.buffer # type: ignore[assignment]
Expand All @@ -579,16 +716,22 @@ def run_server(brain_dir: str | Path | None, *, stdin=None, stdout=None) -> None
# Default: ~/.gradata/brain
brain_dir = str(Path.home() / ".gradata" / "brain")

brain_path = Path(brain_dir) if brain_dir is not None else None

# Prefer the daemon-bridge transport: if a daemon is already running for
# this brain, talk to it over HTTP and never grab the flock ourselves.
if daemon_client is None and use_daemon:
daemon_client = _DaemonClient.discover(brain_path)

# Instantiate Brain from the module-level import (patchable in tests).
# Auto-initialize if the directory doesn't exist (zero-friction first run).
brain: Any = None
lock_cm = None
if brain_dir is not None:
if daemon_client is None and brain_dir is not None:
try:
if Brain is None:
raise ImportError("gradata.brain.Brain could not be imported")
brain_path = Path(brain_dir)
if not brain_path.exists():
if brain_path is not None and not brain_path.exists():
_log.info("Auto-initializing brain at %s", brain_dir)
brain = Brain.init(brain_dir, domain="General")
else:
Expand Down Expand Up @@ -633,7 +776,7 @@ def run_server(brain_dir: str | Path | None, *, stdin=None, stdout=None) -> None
response = _handle_tools_list(req_id)

elif method == "tools/call":
response = _handle_tools_call(req_id, params, brain)
response = _handle_tools_call(req_id, params, brain, daemon_client)

elif method == "shutdown":
if not is_notification:
Expand Down Expand Up @@ -669,6 +812,15 @@ def main() -> None:
metavar="PATH",
help="Path to the brain directory (default: $BRAIN_DIR env var)",
)
parser.add_argument(
"--no-daemon",
action="store_true",
help=(
"Disable HTTP-bridge mode and always open the brain in-process. "
"By default the server delegates tool calls to a local gradata "
"daemon if one is running (recommended — avoids flock contention)."
),
)
args = parser.parse_args()

brain_dir: str | None = args.brain_dir
Expand All @@ -677,7 +829,7 @@ def main() -> None:

brain_dir = os.environ.get("BRAIN_DIR")

run_server(brain_dir)
run_server(brain_dir, use_daemon=not args.no_daemon)


if __name__ == "__main__":
Expand Down
Loading
Loading