Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 155 additions & 6 deletions src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,76 @@ type RuntimeRow = {
id: string;
ownerUserId: string | null;
};
type ChatSessionRow = {
id: string;
agentId: string;
companyId: string | null;
channelId: string | null;
gatewaySessionKey: string | null;
updatedAt: Date;
};

type Field = { key: keyof RuntimeRow };
type Predicate = (row: RuntimeRow) => boolean;
type Field = { key: string };
type Predicate<T = RuntimeRow> = (row: T) => boolean;

const { mockRuntimeRows, mockGetGatewayClientForRuntime, mockHoldClient, mockReleaseClient } = vi.hoisted(() => ({
const {
mockRuntimeRows,
mockChatSessionRows,
mockGetGatewayClientForRuntime,
mockHoldClient,
mockReleaseClient,
mockCanAccessChatSession,
mockPersistChatProgressEvent,
mockPublishChatProgressEvent,
} = vi.hoisted(() => ({
mockRuntimeRows: [] as RuntimeRow[],
mockChatSessionRows: [] as ChatSessionRow[],
mockGetGatewayClientForRuntime: vi.fn(),
mockHoldClient: vi.fn(),
mockReleaseClient: vi.fn(),
mockCanAccessChatSession: vi.fn(),
mockPersistChatProgressEvent: vi.fn(),
mockPublishChatProgressEvent: vi.fn(),
}));

vi.mock("@/db/schema", () => ({
companyRuntimes: {
id: { key: "id" },
ownerUserId: { key: "ownerUserId" },
},
chatSessions: {
id: { key: "id" },
gatewaySessionKey: { key: "gatewaySessionKey" },
updatedAt: { key: "updatedAt" },
},
}));

vi.mock("drizzle-orm", () => ({
eq: (field: Field, value: unknown): Predicate => (row) => row[field.key] === value,
eq: (field: Field, value: unknown): Predicate<Record<string, unknown>> => (row) => row[field.key] === value,
and: (...predicates: Array<Predicate | undefined>): Predicate => (row) =>
predicates.every((predicate) => predicate?.(row) ?? true),
desc: (field: Field) => field,
}));

vi.mock("@/db", () => ({
db: {
select: () => ({
from: () => ({
from: (table: unknown) => ({
where: (predicate: Predicate) => ({
limit: (count: number) => Promise.resolve(mockRuntimeRows.filter(predicate).slice(0, count)),
orderBy: () => ({
limit: (count: number) => Promise.resolve(
mockChatSessionRows
.filter(predicate as unknown as Predicate<ChatSessionRow>)
.sort((a, b) => b.updatedAt.getTime() - a.updatedAt.getTime())
.slice(0, count),
),
}),
limit: (count: number) => {
const rows = isChatSessionsTable(table)
? mockChatSessionRows.filter(predicate as unknown as Predicate<ChatSessionRow>)
: mockRuntimeRows.filter(predicate as unknown as Predicate<RuntimeRow>);
return Promise.resolve(rows.slice(0, count));
},
}),
}),
}),
Expand All @@ -52,12 +92,31 @@ vi.mock("@/lib/gateway-chat-pool", () => ({
releaseClient: (...args: unknown[]) => mockReleaseClient(...args),
}));

vi.mock("@/lib/chat-session-access", () => ({
canAccessChatSession: (...args: unknown[]) => mockCanAccessChatSession(...args),
}));

vi.mock("@/lib/chat-session-events", () => ({
persistChatProgressEvent: (...args: unknown[]) => mockPersistChatProgressEvent(...args),
}));

vi.mock("@/lib/chat-pubsub", () => ({
publishChatProgressEvent: (...args: unknown[]) => mockPublishChatProgressEvent(...args),
}));

import { POST } from "./route";

function isChatSessionsTable(table: unknown) {
return Boolean(table && typeof table === "object" && "gatewaySessionKey" in table);
}

describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {
beforeEach(() => {
vi.clearAllMocks();
mockRuntimeRows.length = 0;
mockChatSessionRows.length = 0;
mockCanAccessChatSession.mockResolvedValue(true);
mockPersistChatProgressEvent.mockResolvedValue(undefined);
});

it("proxies realtime relay audio chunks through an accessible runtime", async () => {
Expand Down Expand Up @@ -185,6 +244,96 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {
expect(mockReleaseClient).toHaveBeenCalledWith(client);
});

it("publishes OpenClaw tool progress from realtime consults into the chat audit trail", async () => {
let gatewayHandler: ((payload: unknown) => void) | null = null;
const client = {
realtimeClientToolCall: vi.fn().mockResolvedValue({ runId: "run_1" }),
realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }),
on: vi.fn((event: string, handler: (payload: unknown) => void) => {
if (event === "*") gatewayHandler = handler;
}),
off: vi.fn(),
};
mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" });
mockChatSessionRows.push({
id: "session_1",
agentId: "neo",
companyId: "company_1",
channelId: "channel_1",
gatewaySessionKey: "main",
updatedAt: new Date(),
});
mockGetGatewayClientForRuntime.mockResolvedValue(client);

const responsePromise = POST(
new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", {
method: "POST",
body: JSON.stringify({
action: "toolCall",
relaySessionId: "relay_1",
sessionKey: "main",
callId: "call_1",
name: "openclaw_agent_consult",
args: { prompt: "Inspect this repo" },
}),
}),
{ params: Promise.resolve({ id: "rt_1" }) },
);

await vi.waitFor(() => expect(gatewayHandler).toBeTypeOf("function"));

(gatewayHandler as ((payload: unknown) => void) | null)?.({
event: "tool_call",
runId: "run_1",
status: "started",
data: {
toolCallId: "tool_1",
name: "exec",
args: { command: "find /Users/roger/Developer -maxdepth 4 -name README.md" },
},
});
(gatewayHandler as ((payload: unknown) => void) | null)?.({
event: "tool",
runId: "run_1",
status: "completed",
data: {
toolCallId: "tool_1",
name: "exec",
output: "README.md",
},
});
(gatewayHandler as ((payload: unknown) => void) | null)?.({
event: "chat",
runId: "run_1",
state: "final",
message: { content: [{ type: "text", text: "The repo is a CrewCMD app." }] },
});

const response = await responsePromise;
expect(response.status).toBe(200);
const persistedEvents = mockPersistChatProgressEvent.mock.calls.map((call) => call[0].payload.event);
expect(persistedEvents).toContain("run_started");
expect(persistedEvents).toContain("tool_started");
expect(persistedEvents).toContain("tool_completed");
expect(persistedEvents).toContain("run_completed");
expect(mockPublishChatProgressEvent).toHaveBeenCalledWith(expect.objectContaining({
sessionId: "session_1",
agentId: "neo",
companyId: "company_1",
sessionKey: "main",
channelId: "channel_1",
event: "tool_started",
payload: expect.objectContaining({
runId: "run_1",
activeTool: expect.objectContaining({
id: "tool_1",
name: "exec",
detailKind: "input",
}),
}),
}));
});

it("extracts final realtime consult text from OpenClaw trace artifacts", async () => {
let gatewayHandler: ((payload: unknown) => void) | null = null;
const client = {
Expand Down
Loading
Loading