From 55a5a1f221cdfb96ff5249d4b7efafc5df85b2ec Mon Sep 17 00:00:00 2001 From: Roger Chappel Date: Fri, 22 May 2026 09:21:37 +1000 Subject: [PATCH] feat: audit realtime tool progress --- .../[id]/talk/realtime/relay/route.test.ts | 161 ++++++++++++++- .../[id]/talk/realtime/relay/route.ts | 194 +++++++++++++++++- 2 files changed, 344 insertions(+), 11 deletions(-) diff --git a/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts b/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts index 0890229..00604a9 100644 --- a/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts +++ b/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts @@ -4,15 +4,36 @@ type RuntimeRow = { id: string; ownerUserId: string | null; }; +type ChatSessionRow = { + id: string; + agentId: string; + companyId: string | null; + channelId: string | null; + gatewaySessionKey: string | null; + updatedAt: Date; +}; -type Field = { key: keyof RuntimeRow }; -type Predicate = (row: RuntimeRow) => boolean; +type Field = { key: string }; +type Predicate = (row: T) => boolean; -const { mockRuntimeRows, mockGetGatewayClientForRuntime, mockHoldClient, mockReleaseClient } = vi.hoisted(() => ({ +const { + mockRuntimeRows, + mockChatSessionRows, + mockGetGatewayClientForRuntime, + mockHoldClient, + mockReleaseClient, + mockCanAccessChatSession, + mockPersistChatProgressEvent, + mockPublishChatProgressEvent, +} = vi.hoisted(() => ({ mockRuntimeRows: [] as RuntimeRow[], + mockChatSessionRows: [] as ChatSessionRow[], mockGetGatewayClientForRuntime: vi.fn(), mockHoldClient: vi.fn(), mockReleaseClient: vi.fn(), + mockCanAccessChatSession: vi.fn(), + mockPersistChatProgressEvent: vi.fn(), + mockPublishChatProgressEvent: vi.fn(), })); vi.mock("@/db/schema", () => ({ @@ -20,20 +41,39 @@ vi.mock("@/db/schema", () => ({ id: { key: "id" }, ownerUserId: { key: "ownerUserId" }, }, + chatSessions: { + id: { key: "id" }, + gatewaySessionKey: { key: "gatewaySessionKey" }, + updatedAt: { key: "updatedAt" }, + }, })); vi.mock("drizzle-orm", () => ({ - eq: (field: Field, value: unknown): Predicate => (row) => row[field.key] === value, + eq: (field: Field, value: unknown): Predicate> => (row) => row[field.key] === value, and: (...predicates: Array): Predicate => (row) => predicates.every((predicate) => predicate?.(row) ?? true), + desc: (field: Field) => field, })); vi.mock("@/db", () => ({ db: { select: () => ({ - from: () => ({ + from: (table: unknown) => ({ where: (predicate: Predicate) => ({ - limit: (count: number) => Promise.resolve(mockRuntimeRows.filter(predicate).slice(0, count)), + orderBy: () => ({ + limit: (count: number) => Promise.resolve( + mockChatSessionRows + .filter(predicate as unknown as Predicate) + .sort((a, b) => b.updatedAt.getTime() - a.updatedAt.getTime()) + .slice(0, count), + ), + }), + limit: (count: number) => { + const rows = isChatSessionsTable(table) + ? mockChatSessionRows.filter(predicate as unknown as Predicate) + : mockRuntimeRows.filter(predicate as unknown as Predicate); + return Promise.resolve(rows.slice(0, count)); + }, }), }), }), @@ -52,12 +92,31 @@ vi.mock("@/lib/gateway-chat-pool", () => ({ releaseClient: (...args: unknown[]) => mockReleaseClient(...args), })); +vi.mock("@/lib/chat-session-access", () => ({ + canAccessChatSession: (...args: unknown[]) => mockCanAccessChatSession(...args), +})); + +vi.mock("@/lib/chat-session-events", () => ({ + persistChatProgressEvent: (...args: unknown[]) => mockPersistChatProgressEvent(...args), +})); + +vi.mock("@/lib/chat-pubsub", () => ({ + publishChatProgressEvent: (...args: unknown[]) => mockPublishChatProgressEvent(...args), +})); + import { POST } from "./route"; +function isChatSessionsTable(table: unknown) { + return Boolean(table && typeof table === "object" && "gatewaySessionKey" in table); +} + describe("POST /api/runtimes/[id]/talk/realtime/relay", () => { beforeEach(() => { vi.clearAllMocks(); mockRuntimeRows.length = 0; + mockChatSessionRows.length = 0; + mockCanAccessChatSession.mockResolvedValue(true); + mockPersistChatProgressEvent.mockResolvedValue(undefined); }); it("proxies realtime relay audio chunks through an accessible runtime", async () => { @@ -185,6 +244,96 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => { expect(mockReleaseClient).toHaveBeenCalledWith(client); }); + it("publishes OpenClaw tool progress from realtime consults into the chat audit trail", async () => { + let gatewayHandler: ((payload: unknown) => void) | null = null; + const client = { + realtimeClientToolCall: vi.fn().mockResolvedValue({ runId: "run_1" }), + realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }), + on: vi.fn((event: string, handler: (payload: unknown) => void) => { + if (event === "*") gatewayHandler = handler; + }), + off: vi.fn(), + }; + mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" }); + mockChatSessionRows.push({ + id: "session_1", + agentId: "neo", + companyId: "company_1", + channelId: "channel_1", + gatewaySessionKey: "main", + updatedAt: new Date(), + }); + mockGetGatewayClientForRuntime.mockResolvedValue(client); + + const responsePromise = POST( + new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", { + method: "POST", + body: JSON.stringify({ + action: "toolCall", + relaySessionId: "relay_1", + sessionKey: "main", + callId: "call_1", + name: "openclaw_agent_consult", + args: { prompt: "Inspect this repo" }, + }), + }), + { params: Promise.resolve({ id: "rt_1" }) }, + ); + + await vi.waitFor(() => expect(gatewayHandler).toBeTypeOf("function")); + + (gatewayHandler as ((payload: unknown) => void) | null)?.({ + event: "tool_call", + runId: "run_1", + status: "started", + data: { + toolCallId: "tool_1", + name: "exec", + args: { command: "find /Users/roger/Developer -maxdepth 4 -name README.md" }, + }, + }); + (gatewayHandler as ((payload: unknown) => void) | null)?.({ + event: "tool", + runId: "run_1", + status: "completed", + data: { + toolCallId: "tool_1", + name: "exec", + output: "README.md", + }, + }); + (gatewayHandler as ((payload: unknown) => void) | null)?.({ + event: "chat", + runId: "run_1", + state: "final", + message: { content: [{ type: "text", text: "The repo is a CrewCMD app." }] }, + }); + + const response = await responsePromise; + expect(response.status).toBe(200); + const persistedEvents = mockPersistChatProgressEvent.mock.calls.map((call) => call[0].payload.event); + expect(persistedEvents).toContain("run_started"); + expect(persistedEvents).toContain("tool_started"); + expect(persistedEvents).toContain("tool_completed"); + expect(persistedEvents).toContain("run_completed"); + expect(mockPublishChatProgressEvent).toHaveBeenCalledWith(expect.objectContaining({ + sessionId: "session_1", + agentId: "neo", + companyId: "company_1", + sessionKey: "main", + channelId: "channel_1", + event: "tool_started", + payload: expect.objectContaining({ + runId: "run_1", + activeTool: expect.objectContaining({ + id: "tool_1", + name: "exec", + detailKind: "input", + }), + }), + })); + }); + it("extracts final realtime consult text from OpenClaw trace artifacts", async () => { let gatewayHandler: ((payload: unknown) => void) | null = null; const client = { diff --git a/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts b/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts index 8960668..0212002 100644 --- a/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts +++ b/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts @@ -1,8 +1,11 @@ import { NextResponse } from "next/server"; -import { and, eq } from "drizzle-orm"; +import { and, desc, eq } from "drizzle-orm"; import { db, withRetry } from "@/db"; -import { companyRuntimes } from "@/db/schema"; +import { chatSessions, companyRuntimes } from "@/db/schema"; import { buildRuntimeReadWhere, getAgentAccessContext } from "@/lib/agent-access"; +import { canAccessChatSession } from "@/lib/chat-session-access"; +import { publishChatProgressEvent } from "@/lib/chat-pubsub"; +import { persistChatProgressEvent } from "@/lib/chat-session-events"; import { getGatewayClientForRuntime, holdClient, releaseClient } from "@/lib/gateway-chat-pool"; import type { GatewayClient } from "@/lib/gateway-client"; @@ -11,6 +14,34 @@ export const maxDuration = 120; type RelayAction = "audio" | "cancelOutput" | "mark" | "toolCall" | "toolResult" | "stop"; +type ChatProgressEventName = + | "run_started" + | "tool_started" + | "tool_updated" + | "tool_completed" + | "run_completed" + | "run_error"; + +type ChatProgressEvent = { + type: "chat_progress"; + event: ChatProgressEventName; + at: string; + elapsedMs: number; + agentId: string; + gatewayAgentId: string; + sessionKey: string; + runId?: string; + error?: string; + activeTool?: { + id?: string; + name: string; + status?: string; + detail?: string; + detailKind?: "input" | "output" | "status"; + detailTruncated?: boolean; + }; +}; + export async function POST( request: Request, { params }: { params: Promise<{ id: string }> }, @@ -75,6 +106,7 @@ export async function POST( if (action === "toolCall") { const result = await runRealtimeToolCall(client, { + request, relaySessionId, sessionKey: readRequiredString(body.sessionKey, "sessionKey"), callId: readRequiredString(body.callId, "callId"), @@ -98,6 +130,7 @@ class ValidationError extends Error {} async function runRealtimeToolCall( client: GatewayClient, params: { + request: Request; relaySessionId: string; sessionKey: string; callId: string; @@ -120,9 +153,17 @@ async function runRealtimeToolCall( holdClient(client); try { try { - const toolCall = await client.realtimeClientToolCall(params); + const toolCall = await client.realtimeClientToolCall({ + relaySessionId: params.relaySessionId, + sessionKey: params.sessionKey, + callId: params.callId, + name: params.name, + args: params.args, + }); const runId = firstString(toolCall.runId, toolCall.idempotencyKey); if (!runId) throw new Error("OpenClaw realtime tool call did not return a run id"); + const audit = await createRealtimeAuditPublisher(params.request, params.sessionKey, runId); + void audit.publish("run_started"); await client.realtimeRelayToolResult({ relaySessionId: params.relaySessionId, @@ -131,12 +172,13 @@ async function runRealtimeToolCall( options: { willContinue: true }, }); - const text = await waitForChatFinal(client, runId); + const text = await waitForChatFinal(client, runId, audit); const result = await client.realtimeRelayToolResult({ relaySessionId: params.relaySessionId, callId: params.callId, result: { text }, }); + void audit.publish("run_completed"); return { delegated: true, runId, result, finalText: text }; } catch (error) { const message = error instanceof Error ? error.message : "OpenClaw realtime tool call failed"; @@ -161,10 +203,16 @@ function buildRealtimeToolWorkingResult() { }; } -function waitForChatFinal(client: GatewayClient, runId: string, timeoutMs = 110_000) { +function waitForChatFinal( + client: GatewayClient, + runId: string, + audit: RealtimeAuditPublisher, + timeoutMs = 110_000, +) { return new Promise((resolve, reject) => { const timer = setTimeout(() => { cleanup(); + void audit.publish("run_error", { error: "Timed out waiting for OpenClaw realtime tool result" }); reject(new Error("Timed out waiting for OpenClaw realtime tool result")); }, timeoutMs); @@ -179,6 +227,9 @@ function waitForChatFinal(client: GatewayClient, runId: string, timeoutMs = 110_ const runIds = extractEventRunIds(event); if (!runIds.includes(runId)) return; + const toolProgress = extractToolProgress(event); + if (toolProgress) void audit.publish(toolProgress.event, { activeTool: toolProgress.activeTool }); + const state = firstString(event.state, event.status)?.toLowerCase(); if (state === "final" || state === "complete" || state === "completed") { const text = extractText(event.message) || extractText(event); @@ -190,6 +241,7 @@ function waitForChatFinal(client: GatewayClient, runId: string, timeoutMs = 110_ if (state === "aborted" || state === "error" || state === "failed") { const message = firstString(event.errorMessage, event.error, event.message) ?? "OpenClaw realtime tool call failed"; + void audit.publish("run_error", { error: message }); cleanup(); reject(new Error(message)); } @@ -199,6 +251,73 @@ function waitForChatFinal(client: GatewayClient, runId: string, timeoutMs = 110_ }); } +type RealtimeAuditPublisher = Awaited>; + +async function createRealtimeAuditPublisher(request: Request, sessionKey: string, runId: string) { + const startedAt = Date.now(); + const session = await resolveAuditSession(request, sessionKey); + + const publish = async ( + event: ChatProgressEventName, + details: Partial> = {}, + ) => { + if (!session?.companyId) return; + const payload: ChatProgressEvent = { + type: "chat_progress", + event, + at: new Date().toISOString(), + elapsedMs: Date.now() - startedAt, + agentId: session.agentId, + gatewayAgentId: session.agentId, + sessionKey, + runId, + ...(details.error ? { error: details.error } : {}), + ...(details.activeTool ? { activeTool: details.activeTool } : {}), + }; + + await persistChatProgressEvent({ + sessionId: session.id, + companyId: session.companyId, + agentId: session.agentId, + gatewaySessionKey: sessionKey, + payload, + }).catch((error) => { + console.error("[api/realtime/relay] Failed to persist realtime tool progress:", error); + }); + + publishChatProgressEvent({ + type: "chat_progress", + sessionId: session.id, + agentId: session.agentId, + companyId: session.companyId, + sessionKey, + channelId: session.channelId, + event, + at: payload.at, + payload: payload as unknown as Record, + }); + }; + + return { publish }; +} + +async function resolveAuditSession(request: Request, sessionKey: string) { + if (!db) return null; + const sessions = await withRetry(() => + db! + .select() + .from(chatSessions) + .where(eq(chatSessions.gatewaySessionKey, sessionKey)) + .orderBy(desc(chatSessions.updatedAt)) + .limit(10) + ); + + for (const session of sessions) { + if (await canAccessChatSession(request as Parameters[0], session)) return session; + } + return null; +} + function readRequiredString(value: unknown, name: string) { if (typeof value === "string" && value.trim().length > 0) return value.trim(); throw new ValidationError(`${name} is required`); @@ -251,6 +370,71 @@ function extractEventRunIds(payload: Record) { .filter((value): value is string => Boolean(value)); } +function stringifyToolDetail(value: unknown, maxLength = 8_000) { + if (value === undefined || value === null) return null; + + let text: string; + if (typeof value === "string") { + text = value; + } else if (typeof value === "number" || typeof value === "boolean") { + text = String(value); + } else { + try { + text = JSON.stringify(value, null, 2); + } catch { + text = String(value); + } + } + + text = text.trim(); + if (!text) return null; + + return { + detail: text.length > maxLength ? text.slice(0, maxLength) : text, + detailTruncated: text.length > maxLength, + }; +} + +function extractToolProgress(payload: Record) { + const stream = firstString(payload.stream, payload.event); + const streamKey = stream?.toLowerCase() ?? ""; + const data = asRecord(payload.data) ?? asRecord(payload.payload) ?? payload; + const kind = firstString(data.kind, data.type, payload.kind, payload.type)?.toLowerCase() ?? ""; + const isToolEvent = streamKey === "tool" || + streamKey === "tool_call" || + streamKey === "agent.tool" || + streamKey.includes("tool") || + kind.includes("tool") || + Boolean(firstString(data.toolCallId, data.tool_call_id, data.toolName, data.tool_name)); + if (!isToolEvent) return null; + + const toolCallId = firstString(data.toolCallId, data.tool_call_id, payload.toolCallId, payload.tool_call_id); + const name = firstString(data.name, data.toolName, data.tool_name, payload.toolName, payload.tool_name) ?? "tool"; + const phase = firstString(data.phase, data.status, data.state, payload.phase, payload.status, payload.state) ?? "update"; + const normalizedPhase = phase.toLowerCase(); + const event: ChatProgressEventName = ["start", "started", "call", "calling", "running"].includes(normalizedPhase) + ? "tool_started" + : ["result", "end", "ended", "complete", "completed", "success", "succeeded"].includes(normalizedPhase) + ? "tool_completed" + : "tool_updated"; + const detailKind: "input" | "output" = event === "tool_started" ? "input" : "output"; + const detailValue = event === "tool_started" + ? data.args ?? data.arguments ?? data.input + : data.partialResult ?? data.partial_result ?? data.result ?? data.output; + const detail = stringifyToolDetail(detailValue); + + return { + event, + activeTool: { + ...(toolCallId ? { id: toolCallId } : {}), + name, + status: phase, + detailKind, + ...(detail ? detail : {}), + }, + }; +} + function extractText(value: unknown, seen = new WeakSet()): string { if (typeof value === "string") return value; if (value === null || value === undefined) return "";