Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions .specs/reasoning-channel-hosted.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ claims:
behavioral: false
required_evidence: plugin.json contains an `mcpServers.thoughtbox-channel` entry (command node, args ${CLAUDE_PLUGIN_ROOT}/dist/thoughtbox-channel.js) and a `channels` array whose `server` matches that key; the channel derives base URL and API key from local Claude settings (THOUGHTBOX_URL is an optional override), so the committed dist/thoughtbox-channel.js boots, declares the claude/channel experimental capability, and stays idle instead of exit(1) when unconfigured; /plugin install succeeds
- id: c2
statement: In hosted (multi-tenant) mode the server persists protocol lifecycle events (ulysses_*/theseus_*) to a tenant-scoped Supabase table, where today they are neither broadcast nor stored
statement: In hosted (multi-tenant) mode the server appends the full protocol lifecycle event stream (ulysses_*/theseus_*) to a dedicated tenant-scoped Supabase table — distinct from protocol_history, which only stores a lossy operation-level subset and is never broadcast
type: implementation
behavioral: false
required_evidence: a migration adds a protocol_events table with tenant_workspace_id + workspace-membership RLS following the hub-table pattern; the multi-tenant session branch in src/index.ts passes onProtocolEvent to createMcpServer and that handler writes a row per protocol event; a test confirms rows are written under the emitting workspace
Expand All @@ -21,10 +21,10 @@ claims:
behavioral: true
required_evidence: GET /protocol/events (changed_since cursor) resolves the workspace via resolveRequestAuth and filters by tenant_workspace_id; an agentic/integration test shows workspace A's events are returned to A's key and a workspace-B key receives none of A's events (cross-tenant negative control)
- id: c4
statement: The channel client selects its transport by configuration — in-process SSE against a local server, HTTP polling of the pull endpoint against a hosted server — and delivers identical channel notifications either way
statement: The channel client selects its transport by configuration — in-process SSE against a local server, HTTP polling of the pull endpoint against a hosted server with session-scoped query parameters when configured — and delivers identical channel notifications either way
type: implementation
behavioral: false
required_evidence: the channel client chooses SSE vs polling from config (URL host inference with an env override); unit tests cover both transports producing the same pushEvent calls; local SSE path is unchanged from current behavior
required_evidence: the channel client chooses SSE vs polling from config (URL host inference with an env override, warning on invalid overrides), forwarding session_id when configured and priming before emitting; unit tests cover both transports producing the same pushEvent calls; local SSE path is unchanged from current behavior
- id: c5
statement: The local-mode reasoning channel keeps working unchanged — the hosted path is additive and does not alter local /events SSE delivery or protocol enforcement
type: governance
Expand Down Expand Up @@ -82,11 +82,17 @@ B6/B8 (the reactive substrate) so the realtime transport is defined once, there.
## Components

1. **Plugin wiring** (c1) — register `mcpServers.thoughtbox-channel` + a
`channels` entry in `plugin.json`, supplying `THOUGHTBOX_URL` via env with
`${CLAUDE_PLUGIN_ROOT}` expansion; ship the dist artifact (already committed).
`channels` entry in `plugin.json` whose `server` matches the mcpServers key;
the channel derives URL+key from local Claude settings (no hardcoded env);
ship the dist artifact (already committed).
2. **Server persistence** (c2) — `protocol_events` migration (tenant-scoped +
RLS, hub-table pattern); wire `onProtocolEvent` in the multi-tenant branch to
persist each event.
RLS, claims/hub-table pattern); wire `onProtocolEvent` in the multi-tenant
branch to append each event. A dedicated table, **not** `protocol_history`:
that table is the session-keyed audit log with an operation-level
`event_type` (plan/outcome/reflect/checkpoint/+2 validator) constrained by a
CHECK — a lossy subset missing init/visa/complete and the ulysses/theseus
prefix. `protocol_events` mirrors the full nine-type `ThoughtboxEvent`
taxonomy the channel emits, so hosted pull equals local SSE byte-for-byte.
3. **Pull endpoint** (c3) — `GET /protocol/events?changed_since=<cursor>`,
authorized by API key, filtered by `tenant_workspace_id`.
4. **Channel client transport selection** (c4) — SSE (local) vs polling (hosted),
Expand Down
39 changes: 39 additions & 0 deletions plugins/thoughtbox-claude-code/dist/polling-event-client.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Thoughtbox Event Polling Client (SPEC-REASONING-CHANNEL-HOSTED c4).
*
* Hosted (multi-tenant) Cloud Run cannot serve the in-process /events SSE
* stream across replicas, so against a hosted server the channel pulls the
* tenant-scoped protocol event log via `GET /protocol/events?changed_since=`.
* Same config surface as EventClient so the channel selects a transport
* without other code changes.
*
* On connect the client primes its cursor to the current tail without
* emitting, so a fresh channel reacts to NEW protocol events rather than
* replaying completed sessions.
*/
import type { ThoughtboxEvent } from "./event-types.js";
export interface PollingEventClientConfig {
baseUrl: string;
apiKey: string;
sessionId?: string;
onEvent: (event: ThoughtboxEvent) => void;
onError?: (error: Error) => void;
onConnect?: () => void;
pollIntervalMs?: number;
}
export declare class PollingEventClient {
private config;
private cursor;
private closed;
private timer;
private backoffMs;
constructor(config: PollingEventClientConfig);
connect(): Promise<void>;
close(): void;
setSessionId(sessionId: string): void;
private scheduleNext;
private poll;
private emit;
private fetchPage;
private reportError;
}
116 changes: 116 additions & 0 deletions plugins/thoughtbox-claude-code/dist/polling-event-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Thoughtbox Event Polling Client (SPEC-REASONING-CHANNEL-HOSTED c4).
*
* Hosted (multi-tenant) Cloud Run cannot serve the in-process /events SSE
* stream across replicas, so against a hosted server the channel pulls the
* tenant-scoped protocol event log via `GET /protocol/events?changed_since=`.
* Same config surface as EventClient so the channel selects a transport
* without other code changes.
*
* On connect the client primes its cursor to the current tail without
* emitting, so a fresh channel reacts to NEW protocol events rather than
* replaying completed sessions.
*/
const DEFAULT_POLL_INTERVAL_MS = 3000;
const PAGE_LIMIT = 200;
const MIN_BACKOFF_MS = 1000;
const MAX_BACKOFF_MS = 30_000;
const BACKOFF_MULTIPLIER = 2;
export class PollingEventClient {
config;
cursor = 0;
closed = false;
timer = null;
backoffMs = MIN_BACKOFF_MS;
constructor(config) {
this.config = config;
}
async connect() {
this.closed = false;
// Prime the cursor to the current tail without emitting.
try {
while (true) {
const page = await this.fetchPage(this.cursor);
if (page.events.length === 0)
break;
this.cursor = page.cursor;
if (page.events.length < PAGE_LIMIT)
break;
}
this.config.onConnect?.();
}
catch (error) {
this.reportError(error);
}
this.scheduleNext(this.config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS);
}
close() {
this.closed = true;
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
}
setSessionId(sessionId) {
this.config.sessionId = sessionId;
}
scheduleNext(delayMs) {
if (this.closed)
return;
this.timer = setTimeout(() => void this.poll(), delayMs);
}
async poll() {
if (this.closed)
return;
try {
let page = await this.fetchPage(this.cursor);
while (page.events.length > 0) {
for (const event of page.events)
this.emit(event);
this.cursor = page.cursor;
if (page.events.length < PAGE_LIMIT)
break;
page = await this.fetchPage(this.cursor);
}
this.backoffMs = MIN_BACKOFF_MS;
this.scheduleNext(this.config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS);
}
catch (error) {
this.reportError(error);
const delay = this.backoffMs;
this.backoffMs = Math.min(this.backoffMs * BACKOFF_MULTIPLIER, MAX_BACKOFF_MS);
this.scheduleNext(delay);
}
}
emit(event) {
const sessionId = typeof event.data.session_id === "string" ? event.data.session_id : "";
this.config.onEvent({
source: event.source,
type: event.type,
sessionId,
timestamp: event.timestamp,
data: event.data,
});
}
async fetchPage(cursor) {
const params = new URLSearchParams();
if (cursor > 0)
params.set("changed_since", String(cursor));
params.set("limit", String(PAGE_LIMIT));
const url = `${this.config.baseUrl}/protocol/events?${params.toString()}`;
const response = await fetch(url, {
headers: { Authorization: `Bearer ${this.config.apiKey}` },
});
if (!response.ok) {
throw new Error(`protocol/events poll failed: ${response.status} ${response.statusText}`);
}
const body = (await response.json());
return { events: body.events ?? [], cursor: body.cursor ?? cursor };
}
reportError(error) {
if (this.closed)
return;
const err = error instanceof Error ? error : new Error(String(error));
this.config.onError?.(err);
}
}
36 changes: 30 additions & 6 deletions plugins/thoughtbox-claude-code/dist/thoughtbox-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,27 @@ import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"
import { extractApiKeyFromLocalConfig, findThoughtboxBaseUrl, loadLocalThoughtboxConfig, } from "./cli/config.js";
import { EventFilter } from "./event-filter.js";
import { EventClient } from "./event-client.js";
import { PollingEventClient } from "./polling-event-client.js";
const THOUGHTBOX_SESSION = process.env.THOUGHTBOX_SESSION;
/**
* Pick the event transport. A local server serves the in-process /events SSE
* stream; a hosted (remote) server is multi-replica and only exposes the
* pull endpoint, so poll there. THOUGHTBOX_CHANNEL_MODE forces a transport.
*/
function selectTransport(baseUrl) {
const override = process.env.THOUGHTBOX_CHANNEL_MODE;
if (override === "sse" || override === "poll")
return override;
try {
const host = new URL(baseUrl).hostname;
return host === "localhost" || host === "127.0.0.1" || host === "::1"
? "sse"
: "poll";
}
catch {
return "poll";
}
}
const instructions = [
"Thoughtbox protocol events arrive as <channel source=\"thoughtbox-channel\" ...>.",
"",
Expand Down Expand Up @@ -126,16 +146,20 @@ async function start() {
return;
}
const { baseUrl, apiKey } = connection;
console.error(`[Channel] Connecting to ${baseUrl}`);
const eventClient = new EventClient({
const mode = selectTransport(baseUrl);
console.error(`[Channel] Connecting to ${baseUrl} (${mode})`);
const clientConfig = {
baseUrl,
apiKey,
...(THOUGHTBOX_SESSION ? { sessionId: THOUGHTBOX_SESSION } : {}),
onEvent: (event) => void pushEvent(event),
onError: (err) => console.error("[Channel] SSE error:", err.message),
onConnect: () => console.error("[Channel] Connected to event stream"),
});
eventClient.connect().catch((err) => {
onError: (err) => console.error(`[Channel] ${mode} error:`, err.message),
onConnect: () => console.error(`[Channel] Connected to event stream (${mode})`),
};
const client = mode === "sse"
? new EventClient(clientConfig)
: new PollingEventClient(clientConfig);
client.connect().catch((err) => {
console.error("[Channel] Failed to connect event stream:", err);
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* PollingEventClient unit tests (SPEC-REASONING-CHANNEL-HOSTED c4).
*
* Mocks fetch — no server. Verifies the client primes its cursor to the tail
* without emitting, then emits only NEW events, advances the cursor across
* polls, and derives the top-level sessionId from data.session_id (the shape
* EventFilter and the channel formatters expect).
*/

import { describe, it, expect, vi, afterEach } from 'vitest';
import { PollingEventClient } from '../polling-event-client.js';
import type { ThoughtboxEvent } from '../event-types.js';

interface Page {
events: Array<{
cursor: number;
type: string;
source: string;
timestamp: string;
data: Record<string, unknown>;
}>;
cursor: number;
}

function jsonResponse(body: Page): Response {
return new Response(JSON.stringify(body), {
status: 200,
headers: { 'Content-Type': 'application/json' },
});
}

function row(cursor: number, type: string, sessionId: string, extra: Record<string, unknown> = {}) {
return {
cursor,
type,
source: 'protocol',
timestamp: '2026-06-15T00:00:00.000Z',
data: { session_id: sessionId, ...extra },
};
}

const flush = () => new Promise((r) => setTimeout(r, 0));

afterEach(() => {
vi.restoreAllMocks();
vi.useRealTimers();
});

describe('PollingEventClient', () => {
it('primes to the tail on connect without emitting, then emits new events', async () => {
// connect prime: one page of existing history (cursor -> 7), then poll
// returns a new event at cursor 8.
const fetchMock = vi
.fn<typeof fetch>()
.mockResolvedValueOnce(jsonResponse({ events: [row(7, 'ulysses_init', 's1')], cursor: 7 }))
.mockResolvedValueOnce(jsonResponse({ events: [], cursor: 7 })) // prime second page (empty -> tail)
.mockResolvedValueOnce(jsonResponse({ events: [row(8, 'ulysses_outcome', 's1', { S: 2 })], cursor: 8 }))
.mockResolvedValue(jsonResponse({ events: [], cursor: 8 }));
vi.stubGlobal('fetch', fetchMock);

const received: ThoughtboxEvent[] = [];
const client = new PollingEventClient({
baseUrl: 'https://hosted.example/',
apiKey: 'tbx_test',
pollIntervalMs: 5,
onEvent: (e) => received.push(e),
});

await client.connect();
expect(received).toHaveLength(0); // primed, no replay of history

await flush();
await new Promise((r) => setTimeout(r, 20)); // let two polls fire
client.close();

expect(received).toHaveLength(1);
expect(received[0]!.type).toBe('ulysses_outcome');
expect(received[0]!.sessionId).toBe('s1'); // derived from data.session_id
expect(received[0]!.data.S).toBe(2);
});

it('sends the API key and changed_since cursor', async () => {
const fetchMock = vi
.fn<typeof fetch>()
.mockResolvedValue(jsonResponse({ events: [], cursor: 0 }));
vi.stubGlobal('fetch', fetchMock);

const client = new PollingEventClient({
baseUrl: 'https://hosted.example',
apiKey: 'tbx_secret',
onEvent: () => {},
});
await client.connect();
client.close();

const [url, init] = fetchMock.mock.calls[0]!;
expect(String(url)).toContain('/protocol/events');
const headers = (init?.headers ?? {}) as Record<string, string>;
expect(headers.Authorization).toBe('Bearer tbx_secret');
});

it('adds the session_id query param when configured', async () => {
const fetchMock = vi
.fn<typeof fetch>()
.mockResolvedValue(jsonResponse({ events: [], cursor: 0 }));
vi.stubGlobal('fetch', fetchMock);

const client = new PollingEventClient({
baseUrl: 'https://hosted.example',
apiKey: 'tbx_secret',
sessionId: 'session-123',
onEvent: () => {},
});
await client.connect();
client.close();

const [url] = fetchMock.mock.calls[0]!;
const params = new URL(String(url)).searchParams;
expect(params.get('session_id')).toBe('session-123');
});
});
Loading
Loading