diff --git a/.specs/reasoning-channel-hosted.md b/.specs/reasoning-channel-hosted.md index 3fea0f33..c4c80df9 100644 --- a/.specs/reasoning-channel-hosted.md +++ b/.specs/reasoning-channel-hosted.md @@ -11,7 +11,7 @@ claims: behavioral: false required_evidence: plugin.json contains an `mcpServers.thoughtbox-channel` entry (command node, args ${CLAUDE_PLUGIN_ROOT}/dist/thoughtbox-channel.js) and a `channels` array whose `server` matches that key; the channel derives base URL and API key from local Claude settings (THOUGHTBOX_URL is an optional override), so the committed dist/thoughtbox-channel.js boots, declares the claude/channel experimental capability, and stays idle instead of exit(1) when unconfigured; /plugin install succeeds - id: c2 - statement: In hosted (multi-tenant) mode the server persists protocol lifecycle events (ulysses_*/theseus_*) to a tenant-scoped Supabase table, where today they are neither broadcast nor stored + statement: In hosted (multi-tenant) mode the server appends the full protocol lifecycle event stream (ulysses_*/theseus_*) to a dedicated tenant-scoped Supabase table — distinct from protocol_history, which only stores a lossy operation-level subset and is never broadcast type: implementation behavioral: false required_evidence: a migration adds a protocol_events table with tenant_workspace_id + workspace-membership RLS following the hub-table pattern; the multi-tenant session branch in src/index.ts passes onProtocolEvent to createMcpServer and that handler writes a row per protocol event; a test confirms rows are written under the emitting workspace @@ -21,10 +21,10 @@ claims: behavioral: true required_evidence: GET /protocol/events (changed_since cursor) resolves the workspace via resolveRequestAuth and filters by tenant_workspace_id; an agentic/integration test shows workspace A's events are returned to A's key and a workspace-B key receives none of A's events (cross-tenant negative control) - id: c4 - statement: The channel client selects its transport by configuration — in-process SSE against a local server, HTTP polling of the pull endpoint against a hosted server — and delivers identical channel notifications either way + statement: The channel client selects its transport by configuration — in-process SSE against a local server, HTTP polling of the pull endpoint against a hosted server with session-scoped query parameters when configured — and delivers identical channel notifications either way type: implementation behavioral: false - required_evidence: the channel client chooses SSE vs polling from config (URL host inference with an env override); unit tests cover both transports producing the same pushEvent calls; local SSE path is unchanged from current behavior + required_evidence: the channel client chooses SSE vs polling from config (URL host inference with an env override, warning on invalid overrides), forwarding session_id when configured and priming before emitting; unit tests cover both transports producing the same pushEvent calls; local SSE path is unchanged from current behavior - id: c5 statement: The local-mode reasoning channel keeps working unchanged — the hosted path is additive and does not alter local /events SSE delivery or protocol enforcement type: governance @@ -82,11 +82,17 @@ B6/B8 (the reactive substrate) so the realtime transport is defined once, there. ## Components 1. **Plugin wiring** (c1) — register `mcpServers.thoughtbox-channel` + a - `channels` entry in `plugin.json`, supplying `THOUGHTBOX_URL` via env with - `${CLAUDE_PLUGIN_ROOT}` expansion; ship the dist artifact (already committed). + `channels` entry in `plugin.json` whose `server` matches the mcpServers key; + the channel derives URL+key from local Claude settings (no hardcoded env); + ship the dist artifact (already committed). 2. **Server persistence** (c2) — `protocol_events` migration (tenant-scoped + - RLS, hub-table pattern); wire `onProtocolEvent` in the multi-tenant branch to - persist each event. + RLS, claims/hub-table pattern); wire `onProtocolEvent` in the multi-tenant + branch to append each event. A dedicated table, **not** `protocol_history`: + that table is the session-keyed audit log with an operation-level + `event_type` (plan/outcome/reflect/checkpoint/+2 validator) constrained by a + CHECK — a lossy subset missing init/visa/complete and the ulysses/theseus + prefix. `protocol_events` mirrors the full nine-type `ThoughtboxEvent` + taxonomy the channel emits, so hosted pull equals local SSE byte-for-byte. 3. **Pull endpoint** (c3) — `GET /protocol/events?changed_since=`, authorized by API key, filtered by `tenant_workspace_id`. 4. **Channel client transport selection** (c4) — SSE (local) vs polling (hosted), diff --git a/plugins/thoughtbox-claude-code/dist/polling-event-client.d.ts b/plugins/thoughtbox-claude-code/dist/polling-event-client.d.ts new file mode 100644 index 00000000..acb2b04c --- /dev/null +++ b/plugins/thoughtbox-claude-code/dist/polling-event-client.d.ts @@ -0,0 +1,39 @@ +/** + * Thoughtbox Event Polling Client (SPEC-REASONING-CHANNEL-HOSTED c4). + * + * Hosted (multi-tenant) Cloud Run cannot serve the in-process /events SSE + * stream across replicas, so against a hosted server the channel pulls the + * tenant-scoped protocol event log via `GET /protocol/events?changed_since=`. + * Same config surface as EventClient so the channel selects a transport + * without other code changes. + * + * On connect the client primes its cursor to the current tail without + * emitting, so a fresh channel reacts to NEW protocol events rather than + * replaying completed sessions. + */ +import type { ThoughtboxEvent } from "./event-types.js"; +export interface PollingEventClientConfig { + baseUrl: string; + apiKey: string; + sessionId?: string; + onEvent: (event: ThoughtboxEvent) => void; + onError?: (error: Error) => void; + onConnect?: () => void; + pollIntervalMs?: number; +} +export declare class PollingEventClient { + private config; + private cursor; + private closed; + private timer; + private backoffMs; + constructor(config: PollingEventClientConfig); + connect(): Promise; + close(): void; + setSessionId(sessionId: string): void; + private scheduleNext; + private poll; + private emit; + private fetchPage; + private reportError; +} diff --git a/plugins/thoughtbox-claude-code/dist/polling-event-client.js b/plugins/thoughtbox-claude-code/dist/polling-event-client.js new file mode 100644 index 00000000..40d35e5a --- /dev/null +++ b/plugins/thoughtbox-claude-code/dist/polling-event-client.js @@ -0,0 +1,116 @@ +/** + * Thoughtbox Event Polling Client (SPEC-REASONING-CHANNEL-HOSTED c4). + * + * Hosted (multi-tenant) Cloud Run cannot serve the in-process /events SSE + * stream across replicas, so against a hosted server the channel pulls the + * tenant-scoped protocol event log via `GET /protocol/events?changed_since=`. + * Same config surface as EventClient so the channel selects a transport + * without other code changes. + * + * On connect the client primes its cursor to the current tail without + * emitting, so a fresh channel reacts to NEW protocol events rather than + * replaying completed sessions. + */ +const DEFAULT_POLL_INTERVAL_MS = 3000; +const PAGE_LIMIT = 200; +const MIN_BACKOFF_MS = 1000; +const MAX_BACKOFF_MS = 30_000; +const BACKOFF_MULTIPLIER = 2; +export class PollingEventClient { + config; + cursor = 0; + closed = false; + timer = null; + backoffMs = MIN_BACKOFF_MS; + constructor(config) { + this.config = config; + } + async connect() { + this.closed = false; + // Prime the cursor to the current tail without emitting. + try { + while (true) { + const page = await this.fetchPage(this.cursor); + if (page.events.length === 0) + break; + this.cursor = page.cursor; + if (page.events.length < PAGE_LIMIT) + break; + } + this.config.onConnect?.(); + } + catch (error) { + this.reportError(error); + } + this.scheduleNext(this.config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS); + } + close() { + this.closed = true; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + } + setSessionId(sessionId) { + this.config.sessionId = sessionId; + } + scheduleNext(delayMs) { + if (this.closed) + return; + this.timer = setTimeout(() => void this.poll(), delayMs); + } + async poll() { + if (this.closed) + return; + try { + let page = await this.fetchPage(this.cursor); + while (page.events.length > 0) { + for (const event of page.events) + this.emit(event); + this.cursor = page.cursor; + if (page.events.length < PAGE_LIMIT) + break; + page = await this.fetchPage(this.cursor); + } + this.backoffMs = MIN_BACKOFF_MS; + this.scheduleNext(this.config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS); + } + catch (error) { + this.reportError(error); + const delay = this.backoffMs; + this.backoffMs = Math.min(this.backoffMs * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS); + this.scheduleNext(delay); + } + } + emit(event) { + const sessionId = typeof event.data.session_id === "string" ? event.data.session_id : ""; + this.config.onEvent({ + source: event.source, + type: event.type, + sessionId, + timestamp: event.timestamp, + data: event.data, + }); + } + async fetchPage(cursor) { + const params = new URLSearchParams(); + if (cursor > 0) + params.set("changed_since", String(cursor)); + params.set("limit", String(PAGE_LIMIT)); + const url = `${this.config.baseUrl}/protocol/events?${params.toString()}`; + const response = await fetch(url, { + headers: { Authorization: `Bearer ${this.config.apiKey}` }, + }); + if (!response.ok) { + throw new Error(`protocol/events poll failed: ${response.status} ${response.statusText}`); + } + const body = (await response.json()); + return { events: body.events ?? [], cursor: body.cursor ?? cursor }; + } + reportError(error) { + if (this.closed) + return; + const err = error instanceof Error ? error : new Error(String(error)); + this.config.onError?.(err); + } +} diff --git a/plugins/thoughtbox-claude-code/dist/thoughtbox-channel.js b/plugins/thoughtbox-claude-code/dist/thoughtbox-channel.js index 662efefd..814b0470 100644 --- a/plugins/thoughtbox-claude-code/dist/thoughtbox-channel.js +++ b/plugins/thoughtbox-claude-code/dist/thoughtbox-channel.js @@ -16,7 +16,27 @@ import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js" import { extractApiKeyFromLocalConfig, findThoughtboxBaseUrl, loadLocalThoughtboxConfig, } from "./cli/config.js"; import { EventFilter } from "./event-filter.js"; import { EventClient } from "./event-client.js"; +import { PollingEventClient } from "./polling-event-client.js"; const THOUGHTBOX_SESSION = process.env.THOUGHTBOX_SESSION; +/** + * Pick the event transport. A local server serves the in-process /events SSE + * stream; a hosted (remote) server is multi-replica and only exposes the + * pull endpoint, so poll there. THOUGHTBOX_CHANNEL_MODE forces a transport. + */ +function selectTransport(baseUrl) { + const override = process.env.THOUGHTBOX_CHANNEL_MODE; + if (override === "sse" || override === "poll") + return override; + try { + const host = new URL(baseUrl).hostname; + return host === "localhost" || host === "127.0.0.1" || host === "::1" + ? "sse" + : "poll"; + } + catch { + return "poll"; + } +} const instructions = [ "Thoughtbox protocol events arrive as .", "", @@ -126,16 +146,20 @@ async function start() { return; } const { baseUrl, apiKey } = connection; - console.error(`[Channel] Connecting to ${baseUrl}`); - const eventClient = new EventClient({ + const mode = selectTransport(baseUrl); + console.error(`[Channel] Connecting to ${baseUrl} (${mode})`); + const clientConfig = { baseUrl, apiKey, ...(THOUGHTBOX_SESSION ? { sessionId: THOUGHTBOX_SESSION } : {}), onEvent: (event) => void pushEvent(event), - onError: (err) => console.error("[Channel] SSE error:", err.message), - onConnect: () => console.error("[Channel] Connected to event stream"), - }); - eventClient.connect().catch((err) => { + onError: (err) => console.error(`[Channel] ${mode} error:`, err.message), + onConnect: () => console.error(`[Channel] Connected to event stream (${mode})`), + }; + const client = mode === "sse" + ? new EventClient(clientConfig) + : new PollingEventClient(clientConfig); + client.connect().catch((err) => { console.error("[Channel] Failed to connect event stream:", err); }); } diff --git a/plugins/thoughtbox-claude-code/src/__tests__/polling-event-client.test.ts b/plugins/thoughtbox-claude-code/src/__tests__/polling-event-client.test.ts new file mode 100644 index 00000000..b6ddce90 --- /dev/null +++ b/plugins/thoughtbox-claude-code/src/__tests__/polling-event-client.test.ts @@ -0,0 +1,121 @@ +/** + * PollingEventClient unit tests (SPEC-REASONING-CHANNEL-HOSTED c4). + * + * Mocks fetch — no server. Verifies the client primes its cursor to the tail + * without emitting, then emits only NEW events, advances the cursor across + * polls, and derives the top-level sessionId from data.session_id (the shape + * EventFilter and the channel formatters expect). + */ + +import { describe, it, expect, vi, afterEach } from 'vitest'; +import { PollingEventClient } from '../polling-event-client.js'; +import type { ThoughtboxEvent } from '../event-types.js'; + +interface Page { + events: Array<{ + cursor: number; + type: string; + source: string; + timestamp: string; + data: Record; + }>; + cursor: number; +} + +function jsonResponse(body: Page): Response { + return new Response(JSON.stringify(body), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }); +} + +function row(cursor: number, type: string, sessionId: string, extra: Record = {}) { + return { + cursor, + type, + source: 'protocol', + timestamp: '2026-06-15T00:00:00.000Z', + data: { session_id: sessionId, ...extra }, + }; +} + +const flush = () => new Promise((r) => setTimeout(r, 0)); + +afterEach(() => { + vi.restoreAllMocks(); + vi.useRealTimers(); +}); + +describe('PollingEventClient', () => { + it('primes to the tail on connect without emitting, then emits new events', async () => { + // connect prime: one page of existing history (cursor -> 7), then poll + // returns a new event at cursor 8. + const fetchMock = vi + .fn() + .mockResolvedValueOnce(jsonResponse({ events: [row(7, 'ulysses_init', 's1')], cursor: 7 })) + .mockResolvedValueOnce(jsonResponse({ events: [], cursor: 7 })) // prime second page (empty -> tail) + .mockResolvedValueOnce(jsonResponse({ events: [row(8, 'ulysses_outcome', 's1', { S: 2 })], cursor: 8 })) + .mockResolvedValue(jsonResponse({ events: [], cursor: 8 })); + vi.stubGlobal('fetch', fetchMock); + + const received: ThoughtboxEvent[] = []; + const client = new PollingEventClient({ + baseUrl: 'https://hosted.example/', + apiKey: 'tbx_test', + pollIntervalMs: 5, + onEvent: (e) => received.push(e), + }); + + await client.connect(); + expect(received).toHaveLength(0); // primed, no replay of history + + await flush(); + await new Promise((r) => setTimeout(r, 20)); // let two polls fire + client.close(); + + expect(received).toHaveLength(1); + expect(received[0]!.type).toBe('ulysses_outcome'); + expect(received[0]!.sessionId).toBe('s1'); // derived from data.session_id + expect(received[0]!.data.S).toBe(2); + }); + + it('sends the API key and changed_since cursor', async () => { + const fetchMock = vi + .fn() + .mockResolvedValue(jsonResponse({ events: [], cursor: 0 })); + vi.stubGlobal('fetch', fetchMock); + + const client = new PollingEventClient({ + baseUrl: 'https://hosted.example', + apiKey: 'tbx_secret', + onEvent: () => {}, + }); + await client.connect(); + client.close(); + + const [url, init] = fetchMock.mock.calls[0]!; + expect(String(url)).toContain('/protocol/events'); + const headers = (init?.headers ?? {}) as Record; + expect(headers.Authorization).toBe('Bearer tbx_secret'); + }); + + it('adds the session_id query param when configured', async () => { + const fetchMock = vi + .fn() + .mockResolvedValue(jsonResponse({ events: [], cursor: 0 })); + vi.stubGlobal('fetch', fetchMock); + + const client = new PollingEventClient({ + baseUrl: 'https://hosted.example', + apiKey: 'tbx_secret', + sessionId: 'session-123', + onEvent: () => {}, + }); + await client.connect(); + client.close(); + + const [url] = fetchMock.mock.calls[0]!; + const params = new URL(String(url)).searchParams; + expect(params.get('session_id')).toBe('session-123'); + }); +}); diff --git a/plugins/thoughtbox-claude-code/src/polling-event-client.ts b/plugins/thoughtbox-claude-code/src/polling-event-client.ts new file mode 100644 index 00000000..1be065db --- /dev/null +++ b/plugins/thoughtbox-claude-code/src/polling-event-client.ts @@ -0,0 +1,163 @@ +/** + * Thoughtbox Event Polling Client (SPEC-REASONING-CHANNEL-HOSTED c4). + * + * Hosted (multi-tenant) Cloud Run cannot serve the in-process /events SSE + * stream across replicas, so against a hosted server the channel pulls the + * tenant-scoped protocol event log via `GET /protocol/events?changed_since=`. + * Same config surface as EventClient so the channel selects a transport + * without other code changes. + * + * On connect the client primes its cursor to the current tail without + * emitting, so a fresh channel reacts to NEW protocol events rather than + * replaying completed sessions. + */ + +import type { ThoughtboxEvent } from "./event-types.js"; + +export interface PollingEventClientConfig { + baseUrl: string; + apiKey: string; + sessionId?: string; + onEvent: (event: ThoughtboxEvent) => void; + onError?: (error: Error) => void; + onConnect?: () => void; + pollIntervalMs?: number; +} + +interface PulledEvent { + cursor: number; + type: ThoughtboxEvent["type"]; + source: ThoughtboxEvent["source"]; + timestamp: string; + data: Record; +} + +interface PullResponse { + events: PulledEvent[]; + cursor: number; +} + +const DEFAULT_POLL_INTERVAL_MS = 3000; +const PAGE_LIMIT = 200; +const MIN_BACKOFF_MS = 1000; +const MAX_BACKOFF_MS = 30_000; +const BACKOFF_MULTIPLIER = 2; + +export class PollingEventClient { + private config: PollingEventClientConfig; + private cursor = 0; + private closed = false; + private timer: ReturnType | null = null; + private backoffMs = MIN_BACKOFF_MS; + + constructor(config: PollingEventClientConfig) { + this.config = config; + } + + async connect(): Promise { + this.closed = false; + this.backoffMs = MIN_BACKOFF_MS; + await this.prime(); + if (this.closed) return; + this.config.onConnect?.(); + this.backoffMs = MIN_BACKOFF_MS; + this.scheduleNext(this.config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS); + } + + close(): void { + this.closed = true; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + } + + setSessionId(sessionId: string): void { + this.config.sessionId = sessionId; + } + + private scheduleNext(delayMs: number): void { + if (this.closed) return; + this.timer = setTimeout(() => void this.poll(), delayMs); + } + + private async poll(): Promise { + if (this.closed) return; + try { + let page = await this.fetchPage(this.cursor); + while (page.events.length > 0) { + for (const event of page.events) this.emit(event); + this.cursor = page.cursor; + if (page.events.length < PAGE_LIMIT) break; + page = await this.fetchPage(this.cursor); + } + this.backoffMs = MIN_BACKOFF_MS; + this.scheduleNext(this.config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS); + } catch (error) { + this.reportError(error); + const delay = this.backoffMs; + this.backoffMs = Math.min(this.backoffMs * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS); + this.scheduleNext(delay); + } + } + + private emit(event: PulledEvent): void { + const sessionId = + typeof event.data.session_id === "string" ? event.data.session_id : ""; + this.config.onEvent({ + source: event.source, + type: event.type, + sessionId, + timestamp: event.timestamp, + data: event.data, + }); + } + + private async prime(): Promise { + while (!this.closed) { + try { + await this.primeOnce(); + return; + } catch (error) { + this.reportError(error); + const delay = this.backoffMs; + this.backoffMs = Math.min(this.backoffMs * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS); + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + } + + private async primeOnce(): Promise { + while (true) { + const page = await this.fetchPage(this.cursor); + if (page.events.length === 0) break; + this.cursor = page.cursor; + if (page.events.length < PAGE_LIMIT) break; + } + } + + private async fetchPage(cursor: number): Promise { + const params = new URLSearchParams(); + if (cursor > 0) params.set("changed_since", String(cursor)); + if (this.config.sessionId) params.set("session_id", this.config.sessionId); + params.set("limit", String(PAGE_LIMIT)); + const url = `${this.config.baseUrl}/protocol/events?${params.toString()}`; + + const response = await fetch(url, { + headers: { Authorization: `Bearer ${this.config.apiKey}` }, + }); + if (!response.ok) { + throw new Error( + `protocol/events poll failed: ${response.status} ${response.statusText}`, + ); + } + const body = (await response.json()) as PullResponse; + return { events: body.events ?? [], cursor: body.cursor ?? cursor }; + } + + private reportError(error: unknown): void { + if (this.closed) return; + const err = error instanceof Error ? error : new Error(String(error)); + this.config.onError?.(err); + } +} diff --git a/plugins/thoughtbox-claude-code/src/thoughtbox-channel.ts b/plugins/thoughtbox-claude-code/src/thoughtbox-channel.ts index e4478338..b7f63b61 100644 --- a/plugins/thoughtbox-claude-code/src/thoughtbox-channel.ts +++ b/plugins/thoughtbox-claude-code/src/thoughtbox-channel.ts @@ -21,10 +21,34 @@ import { } from "./cli/config.js"; import { EventFilter } from "./event-filter.js"; import { EventClient } from "./event-client.js"; +import { PollingEventClient } from "./polling-event-client.js"; import type { ThoughtboxEvent } from "./event-types.js"; const THOUGHTBOX_SESSION = process.env.THOUGHTBOX_SESSION; +/** + * Pick the event transport. A local server serves the in-process /events SSE + * stream; a hosted (remote) server is multi-replica and only exposes the + * pull endpoint, so poll there. THOUGHTBOX_CHANNEL_MODE forces a transport. + */ +function selectTransport(baseUrl: string): "sse" | "poll" { + const override = process.env.THOUGHTBOX_CHANNEL_MODE; + if (override === "sse" || override === "poll") return override; + if (override !== undefined) { + console.error( + `[Channel] Warning: THOUGHTBOX_CHANNEL_MODE="${override}" is not supported; falling back to URL-based transport detection`, + ); + } + try { + const host = new URL(baseUrl).hostname; + return host === "localhost" || host === "127.0.0.1" || host === "::1" + ? "sse" + : "poll"; + } catch { + return "poll"; + } +} + const instructions = [ "Thoughtbox protocol events arrive as .", "", @@ -152,18 +176,24 @@ async function start(): Promise { return; } const { baseUrl, apiKey } = connection; - console.error(`[Channel] Connecting to ${baseUrl}`); + const mode = selectTransport(baseUrl); + console.error(`[Channel] Connecting to ${baseUrl} (${mode})`); - const eventClient = new EventClient({ + const clientConfig = { baseUrl, apiKey, ...(THOUGHTBOX_SESSION ? { sessionId: THOUGHTBOX_SESSION } : {}), - onEvent: (event) => void pushEvent(event), - onError: (err) => console.error("[Channel] SSE error:", err.message), - onConnect: () => console.error("[Channel] Connected to event stream"), - }); + onEvent: (event: ThoughtboxEvent) => void pushEvent(event), + onError: (err: Error) => console.error(`[Channel] ${mode} error:`, err.message), + onConnect: () => console.error(`[Channel] Connected to event stream (${mode})`), + }; + + const client = + mode === "sse" + ? new EventClient(clientConfig) + : new PollingEventClient(clientConfig); - eventClient.connect().catch((err) => { + client.connect().catch((err) => { console.error("[Channel] Failed to connect event stream:", err); }); } diff --git a/plugins/thoughtbox-claude-code/tsconfig.json b/plugins/thoughtbox-claude-code/tsconfig.json index 65b7b211..92a1e6da 100644 --- a/plugins/thoughtbox-claude-code/tsconfig.json +++ b/plugins/thoughtbox-claude-code/tsconfig.json @@ -10,5 +10,6 @@ "skipLibCheck": true, "declaration": true }, - "include": ["src/**/*.ts"] + "include": ["src/**/*.ts"], + "exclude": ["src/**/__tests__/**"] } diff --git a/src/database.types.ts b/src/database.types.ts index a16d2195..b35febc8 100644 --- a/src/database.types.ts +++ b/src/database.types.ts @@ -1481,6 +1481,47 @@ export type Database = { }, ] } + protocol_events: { + Row: { + created_at: string + data: Json + event_timestamp: string + id: number + session_id: string | null + source: string + tenant_workspace_id: string + type: string + } + Insert: { + created_at?: string + data?: Json + event_timestamp: string + id?: never + session_id?: string | null + source?: string + tenant_workspace_id: string + type: string + } + Update: { + created_at?: string + data?: Json + event_timestamp?: string + id?: never + session_id?: string | null + source?: string + tenant_workspace_id?: string + type?: string + } + Relationships: [ + { + foreignKeyName: "protocol_events_tenant_workspace_id_fkey" + columns: ["tenant_workspace_id"] + isOneToOne: false + referencedRelation: "workspaces" + referencedColumns: ["id"] + }, + ] + } protocol_history: { Row: { created_at: string @@ -1810,6 +1851,13 @@ export type Database = { ts?: string } Relationships: [ + { + foreignKeyName: "runbook_fitness_ledger_instance_pinning_fkey" + columns: ["instance_id", "template_id", "template_version"] + isOneToOne: false + referencedRelation: "runbook_instances" + referencedColumns: ["id", "template_id", "template_version"] + }, { foreignKeyName: "runbook_fitness_ledger_instance_tenant_fkey" columns: ["instance_id", "tenant_workspace_id"] diff --git a/src/index.ts b/src/index.ts index f21c263c..0b9fde39 100644 --- a/src/index.ts +++ b/src/index.ts @@ -27,6 +27,10 @@ import { InMemoryClaimStorage } from "./claims/in-memory-claim-storage.js"; import { createSupabaseClaimStorageProvider } from "./claims/supabase-claim-storage.js"; import { InMemoryRunbookStorage } from "./notebook/runbook/in-memory-runbook-storage.js"; import { createSupabaseRunbookStorageProvider } from "./notebook/runbook/supabase-runbook-storage.js"; +import { + createSupabaseProtocolEventStorageProvider, + type ProtocolEventStorage, +} from "./protocol/protocol-event-storage.js"; import { initEvaluation, initMonitoring } from "./evaluation/index.js"; import { createHubHandler, type HubEvent } from "./hub/hub-handler.js"; import { @@ -213,6 +217,18 @@ async function startHttpServer() { : null; const localRunbookStorage = isMultiTenant ? null : new InMemoryRunbookStorage(); + // Hosted protocol-event log (SPEC-REASONING-CHANNEL-HOSTED c2): in + // multi-tenant mode the protocol lifecycle stream is appended to a + // tenant-scoped Supabase table so the reasoning channel can pull it + // (changed_since) across Cloud Run replicas. Local mode keeps the + // in-process /events SSE broadcast and needs no durable log. + const tenantProtocolEventStorage = isMultiTenant + ? createSupabaseProtocolEventStorageProvider({ + supabaseUrl: process.env.SUPABASE_URL!, + serviceRoleKey: process.env.SUPABASE_SERVICE_ROLE_KEY!, + }) + : null; + // Local-mode hub thought store: ONE storage instance shared by /hub/api // and every local MCP session's tb.hub dispatcher. Per-session // FileSystemStorage instances each hold an in-memory session index, so a @@ -408,6 +424,7 @@ async function startHttpServer() { const storage = factory.getStorage(workspaceId); const knowledgeStorage = factory.getKnowledgeStorage(workspaceId); + const protocolEventStorage = tenantProtocolEventStorage!(workspaceId); const server = await createMcpServer({ sessionId, storage, @@ -418,6 +435,15 @@ async function startHttpServer() { dataDir, knowledgeStorage, workspaceId, + // Persist protocol lifecycle events to the tenant-scoped log so the + // reasoning channel can pull them across replicas (c2). Fire-and- + // forget: a log write must never block a protocol transition. + onProtocolEvent: (event) => { + void protocolEventStorage.append(event).catch((err: unknown) => { + const message = err instanceof Error ? err.message : String(err); + console.error(`[ProtocolEvents] append failed: ${message}`); + }); + }, config: { disableThoughtLogging: (process.env.DISABLE_THOUGHT_LOGGING || "").toLowerCase() === "true", @@ -572,6 +598,44 @@ async function startHttpServer() { supabaseUrl: process.env.SUPABASE_URL!, serviceRoleKey: process.env.SUPABASE_SERVICE_ROLE_KEY!, }); + + // Reasoning-channel pull endpoint (SPEC-REASONING-CHANNEL-HOSTED c3): + // returns protocol events with id > changed_since for the caller's + // workspace, oldest first. The API key resolves the workspace, so a key + // can never read another tenant's events. Hosted only — local mode uses + // the in-process /events SSE stream. + app.get("/protocol/events", async (req: Request, res: Response) => { + let workspaceId: string; + try { + workspaceId = await resolveRequestAuth(req); + } catch (err) { + const message = err instanceof Error ? err.message : "Authentication failed"; + res.status(401).json({ error: message }); + return; + } + + const parsePositiveInt = (value: unknown): number | undefined => { + if (typeof value !== "string") return undefined; + const parsed = Number.parseInt(value, 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : undefined; + }; + + const cursor = parsePositiveInt(req.query.changed_since) ?? 0; + const limit = parsePositiveInt(req.query.limit); + + try { + const events = await tenantProtocolEventStorage!(workspaceId).changedSince( + cursor, + limit, + ); + const nextCursor = + events.length > 0 ? events[events.length - 1]!.cursor : cursor; + res.json({ events, cursor: nextCursor }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + res.status(500).json({ error: message }); + } + }); } const httpServer = app.listen(port, () => { diff --git a/src/protocol/__tests__/protocol-event-storage.test.ts b/src/protocol/__tests__/protocol-event-storage.test.ts new file mode 100644 index 00000000..d74fc4b6 --- /dev/null +++ b/src/protocol/__tests__/protocol-event-storage.test.ts @@ -0,0 +1,128 @@ +/** + * SupabaseProtocolEventStorage integration tests + * (SPEC-REASONING-CHANNEL-HOSTED c2/c3). + * + * Runs against the local Supabase stack; skips gracefully when it is not + * reachable. Covers append + changed_since ordering and, critically, the + * cross-tenant negative control: one tenant's storage never returns another + * tenant's events. + */ + +import { describe, it, expect, beforeAll, beforeEach } from 'vitest'; +import type { SupabaseClient } from '@supabase/supabase-js'; +import { + createServiceClient, + ensureTestWorkspace, + getTestSupabaseConfig, + isSupabaseAvailable, + TEST_WORKSPACE_ID, +} from '../../__tests__/supabase-test-helpers.js'; +import { SupabaseProtocolEventStorage } from '../protocol-event-storage.js'; +import type { ThoughtboxEvent } from '../../events/types.js'; + +const TENANT_A = TEST_WORKSPACE_ID; +const TENANT_B = '33333333-3333-4333-a333-333333333333'; + +function evt( + type: ThoughtboxEvent['type'], + sessionId: string, + extra: Record = {}, +): ThoughtboxEvent { + return { + source: 'protocol', + type, + workspaceId: 'set-by-storage-not-event', + timestamp: new Date().toISOString(), + data: { session_id: sessionId, ...extra }, + }; +} + +function storageFor(tenantWorkspaceId: string): SupabaseProtocolEventStorage { + return new SupabaseProtocolEventStorage({ + ...getTestSupabaseConfig(), + tenantWorkspaceId, + }); +} + +describe('SupabaseProtocolEventStorage', () => { + let available = false; + let service: SupabaseClient; + + beforeAll(async () => { + available = await isSupabaseAvailable(); + if (!available) return; + service = createServiceClient(); + await ensureTestWorkspace(service); + + const { data: users } = await service.auth.admin.listUsers(); + const ownerId = users?.users?.find((u) => u.email === 'test@test.local')?.id; + await service.from('workspaces').upsert( + { + id: TENANT_B, + name: 'Tenant B', + slug: 'tenant-b-protocol-events', + owner_user_id: ownerId, + status: 'active', + plan_id: 'free', + }, + { onConflict: 'id' }, + ); + }); + + beforeEach(async () => { + if (!available) return; + await service + .from('protocol_events') + .delete() + .in('tenant_workspace_id', [TENANT_A, TENANT_B]); + }); + + it('appends events and returns them oldest-first with an advancing cursor', async () => { + if (!available) return; + const storage = storageFor(TENANT_A); + + await storage.append(evt('ulysses_init', 's1')); + await storage.append(evt('ulysses_outcome', 's1', { S: 2 })); + + const events = await storage.changedSince(0); + expect(events.map((e) => e.type)).toEqual(['ulysses_init', 'ulysses_outcome']); + expect(events[1]!.cursor).toBeGreaterThan(events[0]!.cursor); + expect(events[1]!.data.S).toBe(2); + + // The tail cursor returns nothing new. + const after = await storage.changedSince(events[1]!.cursor); + expect(after).toEqual([]); + }); + + it("never returns another tenant's events (cross-tenant negative control)", async () => { + if (!available) return; + const a = storageFor(TENANT_A); + const b = storageFor(TENANT_B); + + await a.append(evt('theseus_init', 'session-a')); + await b.append(evt('theseus_init', 'session-b')); + + const aEvents = await a.changedSince(0); + const bEvents = await b.changedSince(0); + + expect(aEvents).toHaveLength(1); + expect(bEvents).toHaveLength(1); + expect(aEvents[0]!.data.session_id).toBe('session-a'); + expect(bEvents[0]!.data.session_id).toBe('session-b'); + }); + + it('honors the limit and pages forward by cursor', async () => { + if (!available) return; + const storage = storageFor(TENANT_A); + for (let i = 0; i < 5; i++) { + await storage.append(evt('ulysses_reflect', 's1', { n: i })); + } + + const firstPage = await storage.changedSince(0, 2); + expect(firstPage).toHaveLength(2); + expect(firstPage.map((e) => e.data.n)).toEqual([0, 1]); + + const secondPage = await storage.changedSince(firstPage[1]!.cursor, 2); + expect(secondPage.map((e) => e.data.n)).toEqual([2, 3]); + }); +}); diff --git a/src/protocol/protocol-event-storage.ts b/src/protocol/protocol-event-storage.ts new file mode 100644 index 00000000..b84f477f --- /dev/null +++ b/src/protocol/protocol-event-storage.ts @@ -0,0 +1,114 @@ +/** + * SupabaseProtocolEventStorage — durable, tenant-scoped persistence of the + * protocol lifecycle event stream (SPEC-REASONING-CHANNEL-HOSTED, claim c2). + * + * In local mode the handler's onProtocolEvent stream is broadcast in-process + * over /events SSE. In hosted (multi-tenant) Cloud Run that won't span + * replicas, so the same stream is appended here and the reasoning channel + * pulls it via changed_since (claim c3), scoped to its workspace. + * + * Distinct from protocol_history (the session-keyed audit log with + * operation-level event_type); this table carries the full ThoughtboxEvent + * taxonomy the channel consumes, identical to local SSE delivery. + */ + +import { createClient, type SupabaseClient } from '@supabase/supabase-js'; +import type { Database } from '../database.types.js'; +import type { ThoughtboxEvent } from '../events/types.js'; + +const DEFAULT_LIMIT = 100; +const MAX_LIMIT = 500; + +/** A persisted protocol event plus its monotonic pull cursor (row id). */ +export interface ProtocolEventRecord extends ThoughtboxEvent { + cursor: number; +} + +export interface ProtocolEventStorage { + append(event: ThoughtboxEvent): Promise; + /** + * Events with id greater than `cursor`, oldest first. `cursor` 0 reads from + * the start. Caller scopes nothing — the storage is bound to one tenant. + */ + changedSince(cursor: number, limit?: number): Promise; +} + +export interface SupabaseProtocolEventStorageConfig { + supabaseUrl: string; + /** Service role key — bypasses RLS; tenant isolation is enforced in queries. */ + serviceRoleKey: string; + /** SaaS workspace (public.workspaces.id) all event rows are scoped to. */ + tenantWorkspaceId: string; +} + +export class SupabaseProtocolEventStorage implements ProtocolEventStorage { + private client: SupabaseClient; + private tenantWorkspaceId: string; + + constructor(config: SupabaseProtocolEventStorageConfig) { + this.tenantWorkspaceId = config.tenantWorkspaceId; + this.client = createClient(config.supabaseUrl, config.serviceRoleKey, { + auth: { persistSession: false, autoRefreshToken: false }, + }); + } + + private fail(operation: string, message: string): never { + throw new Error( + `SupabaseProtocolEventStorage.${operation} failed (tenant ${this.tenantWorkspaceId}): ${message}`, + ); + } + + async append(event: ThoughtboxEvent): Promise { + const sessionId = + typeof event.data.session_id === 'string' ? event.data.session_id : null; + const { error } = await this.client.from('protocol_events').insert({ + tenant_workspace_id: this.tenantWorkspaceId, + source: event.source, + type: event.type, + session_id: sessionId, + event_timestamp: event.timestamp, + data: event.data as Database['public']['Tables']['protocol_events']['Insert']['data'], + }); + if (error) this.fail('append', error.message); + } + + async changedSince(cursor: number, limit = DEFAULT_LIMIT): Promise { + const capped = Math.min(Math.max(1, limit), MAX_LIMIT); + const { data, error } = await this.client + .from('protocol_events') + .select('id, source, type, event_timestamp, data') + .eq('tenant_workspace_id', this.tenantWorkspaceId) + .gt('id', cursor) + .order('id', { ascending: true }) + .limit(capped); + if (error) this.fail('changedSince', error.message); + + return (data ?? []).map((row) => ({ + cursor: row.id, + source: row.source as ThoughtboxEvent['source'], + type: row.type as ThoughtboxEvent['type'], + workspaceId: this.tenantWorkspaceId, + timestamp: row.event_timestamp, + data: (row.data ?? {}) as Record, + })); + } +} + +/** + * Per-tenant SupabaseProtocolEventStorage provider for multi-tenant mode. + * Instances are cached per tenant workspace; all rows are scoped by + * tenant_workspace_id so the channel can never pull another tenant's events. + */ +export function createSupabaseProtocolEventStorageProvider( + config: Omit, +): (tenantWorkspaceId: string) => ProtocolEventStorage { + const cache = new Map(); + return (tenantWorkspaceId: string): ProtocolEventStorage => { + let storage = cache.get(tenantWorkspaceId); + if (!storage) { + storage = new SupabaseProtocolEventStorage({ ...config, tenantWorkspaceId }); + cache.set(tenantWorkspaceId, storage); + } + return storage; + }; +} diff --git a/supabase/migrations/20260615000000_add_protocol_events_table.sql b/supabase/migrations/20260615000000_add_protocol_events_table.sql new file mode 100644 index 00000000..8fdc375f --- /dev/null +++ b/supabase/migrations/20260615000000_add_protocol_events_table.sql @@ -0,0 +1,57 @@ +-- Protocol event log (SPEC-REASONING-CHANNEL-HOSTED, claim c2). +-- Append-only persistence of the protocol lifecycle event stream +-- (ulysses_*/theseus_*) the handler emits via onProtocolEvent. In local mode +-- that stream is broadcast in-process over /events SSE; in hosted +-- (multi-tenant) Cloud Run it must be durable and cross-replica so the +-- reasoning channel can pull it (changed_since) scoped to its workspace. +-- +-- Distinct from protocol_history: that table is the session-keyed audit log +-- with operation-level event_type (plan/outcome/reflect/checkpoint/...). This +-- table mirrors the richer ThoughtboxEvent taxonomy (nine lifecycle types, +-- including init/visa/complete) the channel consumes, identical to local SSE. +-- +-- Copies the proven tenant-scoping pattern (claims, 20260612000000): +-- tenant_workspace_id FK to public.workspaces; RLS with service_role full +-- access (the server scopes every query) plus a workspace-membership SELECT +-- policy for future authenticated/anon-key reads. The bigint identity id is +-- the monotonic pull cursor used by changed_since. +-- +-- Idempotent: IF NOT EXISTS guards on table/index; policies are dropped +-- before creation so a double-apply is safe. + +CREATE TABLE IF NOT EXISTS public.protocol_events ( + id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + tenant_workspace_id uuid NOT NULL REFERENCES public.workspaces(id) ON DELETE CASCADE, + source text NOT NULL DEFAULT 'protocol' CHECK (source IN ('hub', 'protocol')), + type text NOT NULL, + session_id text, + event_timestamp timestamptz NOT NULL, + data jsonb NOT NULL DEFAULT '{}', + created_at timestamptz NOT NULL DEFAULT now() +); + +-- changed_since query: WHERE tenant_workspace_id = $1 AND id > $cursor ORDER BY id +CREATE INDEX IF NOT EXISTS idx_protocol_events_tenant_id + ON public.protocol_events(tenant_workspace_id, id); + +-- --------------------------------------------------------------------------- +-- RLS — the server uses service_role (bypasses RLS) and scopes every query by +-- tenant_workspace_id; the workspace-membership SELECT policy exists for +-- future authenticated/anon-key access, matching the claims/hub pattern. +-- --------------------------------------------------------------------------- + +ALTER TABLE public.protocol_events ENABLE ROW LEVEL SECURITY; + +DROP POLICY IF EXISTS "service_role_full_access_protocol_events" ON public.protocol_events; +CREATE POLICY "service_role_full_access_protocol_events" ON public.protocol_events + FOR ALL TO service_role USING (true) WITH CHECK (true); + +DROP POLICY IF EXISTS "workspace_member_read_protocol_events" ON public.protocol_events; +CREATE POLICY "workspace_member_read_protocol_events" ON public.protocol_events + FOR SELECT TO authenticated + USING ( + tenant_workspace_id IN ( + SELECT workspace_id FROM public.workspace_memberships + WHERE user_id = auth.uid() + ) + ); diff --git a/vitest.config.ts b/vitest.config.ts index 758dd599..e50a82fb 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -5,6 +5,7 @@ export default defineConfig({ include: [ 'src/**/__tests__/**/*.test.ts', 'scripts/**/__tests__/**/*.test.ts', + 'plugins/**/__tests__/**/*.test.ts', 'demo/**/*.ts', ], // Supabase integration tests share a single DB instance and use