diff --git a/.changeset/chat-completion-tool-calls-608.md b/.changeset/chat-completion-tool-calls-608.md new file mode 100644 index 00000000..6d362e9f --- /dev/null +++ b/.changeset/chat-completion-tool-calls-608.md @@ -0,0 +1,27 @@ +--- +"ornn-api": patch +--- + +NyxLlmClient now normalizes Chat Completions tool-call deltas into the same Responses-API `response.output_item.done` / `function_call` events the playground tool loop already consumes (#608). + +Background: #574 routed chat-completion providers to `/chat/completions` and translated text deltas, but the stream parser ignored `choices[].delta.tool_calls`. The playground tool-use loop in `chatService.ts` only matches on Responses-API `response.output_item.done` with `item.type === "function_call"`, so when a chat-completion provider (DeepSeek, Together, any OpenAI-compat gateway) responded with a tool call, no `function_call` event ever reached the loop. The model's `execute_in_sandbox(...)` invocation arrived as plain assistant text and got rendered as JSON in the chat instead of being executed — runtime-based and mixed skills appeared to "respond" without ever running the sandbox. + +Fix: `parseChatCompletionStream` keeps a per-index `Map` carrying `{id, name, arguments}`. Each `choices[].delta.tool_calls[]` chunk merges into its index buffer (id + name arrive on the first chunk for that call; the `arguments` JSON string accumulates across many chunks). A turn flushes when any of: explicit `finish_reason` (`tool_calls`, `stop`, anything non-null), upstream `[DONE]` sentinel, or stream EOF — whichever fires first. A `flushed` guard makes flush idempotent so we never double-emit if multiple end signals fire. + +The synthesized event matches the Responses-API shape `chatService.ts` already validates with Zod (`outputItemDoneEventSchema`): + +```js +{ + type: "response.output_item.done", + item: { + type: "function_call", + id, call_id, name, arguments, // arguments is the accumulated JSON string + }, +} +``` + +This way zero changes are needed in `chatService.ts` — its existing `pendingToolCall` capture + `executeToolCall` dispatch path works for both upstream formats now. + +Parallel tool calls within one assistant turn are supported (one done event per index, emitted in index order). Missing `index` falls back to 0 (treated as a single tool call). Streams that close without `[DONE]` or `finish_reason` still flush at EOF so a buffered call is never lost. + +Coverage: 6 new tests appended to `src/clients/nyxid/llm.test.ts` — chunked accumulation + finish_reason flush, EOF flush without [DONE], parallel tool calls, idempotent flush across finish_reason+[DONE], intermixed text+tool deltas (correct event order), missing `index` fallback. All 17 tests in the file green; full ornn-api suite has no new regressions. diff --git a/ornn-api/src/clients/nyxid/llm.test.ts b/ornn-api/src/clients/nyxid/llm.test.ts index 143dc852..ae73130c 100644 --- a/ornn-api/src/clients/nyxid/llm.test.ts +++ b/ornn-api/src/clients/nyxid/llm.test.ts @@ -400,3 +400,211 @@ describe("NyxLlmClient.complete() routing on apiFormat", () => { expect(out).toEqual([]); }); }); + +// --------------------------------------------------------------------------- +// #608 — chat-completion tool-call delta normalization +// --------------------------------------------------------------------------- + +describe("chat-completion stream tool-call normalization (#608)", () => { + function makeClient(): NyxLlmClient { + return new NyxLlmClient({ + resolver: makeResolver({ + gatewayUrl: "https://api.example.com", + apiKey: "sk-x", + apiFormat: "chat-completion", + }), + saTokenProvider: STUB_SA_TOKEN, + }); + } + + it("accumulates tool_calls across chunks and emits one output_item.done on finish_reason=tool_calls", async () => { + fetchHandler = () => + sseResponse([ + JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, + id: "call_abc", + type: "function", + function: { name: "execute_in_sandbox", arguments: "" }, + }], + }, + }], + }), + JSON.stringify({ + choices: [{ + delta: { tool_calls: [{ index: 0, function: { arguments: "{\"scr" } }] }, + }], + }), + JSON.stringify({ + choices: [{ + delta: { tool_calls: [{ index: 0, function: { arguments: "ipt\":\"x\"}" } }] }, + }], + }), + JSON.stringify({ choices: [{ delta: {}, finish_reason: "tool_calls" }] }), + ]); + + const events: ResponsesApiStreamEvent[] = []; + for await (const e of makeClient().stream({ + model: "deepseek-v4", + input: [{ role: "user", content: "run x" }], + tools: [{ + type: "function", + name: "execute_in_sandbox", + description: "run", + parameters: { type: "object" }, + }], + })) events.push(e); + + const done = events.filter((e) => e.type === "response.output_item.done"); + expect(done).toHaveLength(1); + expect(done[0]).toEqual({ + type: "response.output_item.done", + item: { + type: "function_call", + id: "call_abc", + call_id: "call_abc", + name: "execute_in_sandbox", + arguments: "{\"script\":\"x\"}", + }, + }); + }); + + it("flushes a buffered tool call when stream ends without [DONE] or finish_reason", async () => { + // Body has no [DONE] sentinel and no finish_reason — just an EOF. + fetchHandler = () => + new Response( + `data: ${JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, + id: "call_z", + function: { name: "t", arguments: "{\"a\":1}" }, + }], + }, + }], + })}\n\n`, + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ); + + const events: ResponsesApiStreamEvent[] = []; + for await (const e of makeClient().stream({ + model: "m", + input: [{ role: "user", content: "go" }], + })) events.push(e); + + const done = events.find((e) => e.type === "response.output_item.done"); + expect(done).toEqual({ + type: "response.output_item.done", + item: { + type: "function_call", + id: "call_z", + call_id: "call_z", + name: "t", + arguments: "{\"a\":1}", + }, + }); + }); + + it("supports parallel tool calls — one done event per index", async () => { + fetchHandler = () => + sseResponse([ + JSON.stringify({ + choices: [{ + delta: { + tool_calls: [ + { index: 0, id: "call_a", function: { name: "fn_a", arguments: "{}" } }, + { index: 1, id: "call_b", function: { name: "fn_b", arguments: "{}" } }, + ], + }, + }], + }), + JSON.stringify({ choices: [{ delta: {}, finish_reason: "tool_calls" }] }), + ]); + + const events: ResponsesApiStreamEvent[] = []; + for await (const e of makeClient().stream({ + model: "m", + input: [{ role: "user", content: "go" }], + })) events.push(e); + + const done = events.filter((e) => e.type === "response.output_item.done"); + expect(done).toHaveLength(2); + expect((done[0]!.item as { id: string }).id).toBe("call_a"); + expect((done[1]!.item as { id: string }).id).toBe("call_b"); + }); + + it("only flushes once when finish_reason and [DONE] both arrive", async () => { + fetchHandler = () => + sseResponse([ + JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ index: 0, id: "call_x", function: { name: "fn", arguments: "{}" } }], + }, + finish_reason: "tool_calls", + }], + }), + ]); + + const events: ResponsesApiStreamEvent[] = []; + for await (const e of makeClient().stream({ + model: "m", + input: [{ role: "user", content: "go" }], + })) events.push(e); + + const done = events.filter((e) => e.type === "response.output_item.done"); + expect(done).toHaveLength(1); + }); + + it("intermixed text + tool_call deltas produce text-delta then done event in order", async () => { + fetchHandler = () => + sseResponse([ + JSON.stringify({ choices: [{ delta: { content: "thinking…" } }] }), + JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ index: 0, id: "call_q", function: { name: "fn", arguments: "{}" } }], + }, + }], + }), + JSON.stringify({ choices: [{ delta: {}, finish_reason: "tool_calls" }] }), + ]); + + const events: ResponsesApiStreamEvent[] = []; + for await (const e of makeClient().stream({ + model: "m", + input: [{ role: "user", content: "go" }], + })) events.push(e); + + expect(events.map((e) => e.type)).toEqual([ + "response.output_text.delta", + "response.output_item.done", + ]); + }); + + it("tool_calls.index missing → falls back to index 0", async () => { + fetchHandler = () => + sseResponse([ + JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ id: "call_noix", function: { name: "fn", arguments: "{}" } }], + }, + finish_reason: "tool_calls", + }], + }), + ]); + + const events: ResponsesApiStreamEvent[] = []; + for await (const e of makeClient().stream({ + model: "m", + input: [{ role: "user", content: "go" }], + })) events.push(e); + + const done = events.find((e) => e.type === "response.output_item.done"); + expect((done?.item as { id: string }).id).toBe("call_noix"); + }); +}); diff --git a/ornn-api/src/clients/nyxid/llm.ts b/ornn-api/src/clients/nyxid/llm.ts index 1f958eca..f3e9e681 100644 --- a/ornn-api/src/clients/nyxid/llm.ts +++ b/ornn-api/src/clients/nyxid/llm.ts @@ -11,11 +11,10 @@ * `ResponsesApiStreamEvent`). For chat-completion providers the client * translates the request body on the way out and normalizes the SSE * stream / completion payload back into Responses-API event shape on - * the way in, so consumers (skill generation + playground) do not need - * to branch on apiFormat (#574). - * - * Tool-call delta normalization for chat-completion is intentionally - * out of scope here — tracked in #608. + * the way in — both text deltas (#574) and accumulated tool-call + * deltas synthesized into `response.output_item.done` / + * `function_call` events (#608) — so consumers (skill generation + + * playground tool loop) never branch on apiFormat. * * Authenticates using a Service Account (SA) token obtained via * client_credentials grant when the resolved provider has no direct @@ -185,21 +184,76 @@ async function* parseResponsesStream( } /** - * Parse Chat Completions SSE and translate text deltas into - * Responses-API event shape so consumers stay format-agnostic. + * Internal accumulator for a single OpenAI Chat Completions tool call + * as it streams in. `id` + `name` arrive on the first delta for that + * tool-call index; `arguments` is a JSON string that accumulates + * across many chunks before the model finishes emitting it. + */ +interface ToolCallAccumulator { + id: string; + name: string; + arguments: string; +} + +/** + * Parse Chat Completions SSE and translate it into Responses-API + * event shape so downstream consumers (skill generation, playground + * tool-use loop) stay format-agnostic. * - * For #574, only text deltas (`choices[].delta.content`) are - * translated — emitted as `response.output_text.delta`. Tool-call - * delta normalization is tracked in #608. + * - `choices[].delta.content` chunks → `response.output_text.delta` + * events (#574). + * - `choices[].delta.tool_calls` chunks are buffered per tool-call + * index (id + name + accumulated JSON arguments). When the turn + * completes (explicit `finish_reason` of `"tool_calls"` / + * `"stop"`, upstream `[DONE]`, or EOF), each buffered tool call is + * emitted as a synthesized `response.output_item.done` event with + * `item.type === "function_call"` — the exact shape the playground + * loop already consumes (#608). Without this the playground never + * sees a tool call from chat-completion providers and renders + * `execute_in_sandbox(...)` as plain text instead of running it. */ async function* parseChatCompletionStream( response: Response, ): AsyncIterable { + const toolCalls = new Map(); + let flushed = false; + + function* flushToolCalls(): Generator { + if (flushed) return; + flushed = true; + const indices = [...toolCalls.keys()].sort((a, b) => a - b); + for (const idx of indices) { + const tc = toolCalls.get(idx)!; + yield { + type: "response.output_item.done", + item: { + type: "function_call", + id: tc.id, + call_id: tc.id, + name: tc.name, + arguments: tc.arguments, + }, + }; + } + } + for await (const { data } of parseSSELines(response)) { - if (data === "[DONE]") return; + if (data === "[DONE]") { + yield* flushToolCalls(); + return; + } let chunk: { choices?: Array<{ - delta?: { content?: string | null }; + delta?: { + content?: string | null; + tool_calls?: Array<{ + index?: number; + id?: string; + type?: string; + function?: { name?: string; arguments?: string }; + }>; + }; + finish_reason?: string | null; }>; }; try { @@ -208,11 +262,47 @@ async function* parseChatCompletionStream( logger.debug({ data: data.slice(0, 100) }, "Failed to parse chat-completion chunk"); continue; } - const delta = chunk.choices?.[0]?.delta?.content; - if (typeof delta === "string" && delta.length > 0) { - yield { type: "response.output_text.delta", delta }; + + const choice = chunk.choices?.[0]; + if (!choice) continue; + + const text = choice.delta?.content; + if (typeof text === "string" && text.length > 0) { + yield { type: "response.output_text.delta", delta: text }; + } + + const deltaToolCalls = choice.delta?.tool_calls; + if (Array.isArray(deltaToolCalls)) { + for (const piece of deltaToolCalls) { + // OpenAI uses `index` to disambiguate parallel tool calls + // within one assistant turn. Missing index → assume a single + // tool call at index 0. + const idx = typeof piece.index === "number" ? piece.index : 0; + const acc = toolCalls.get(idx) ?? { id: "", name: "", arguments: "" }; + if (piece.id) acc.id = piece.id; + if (piece.function?.name) acc.name = piece.function.name; + if (typeof piece.function?.arguments === "string") { + acc.arguments += piece.function.arguments; + } + toolCalls.set(idx, acc); + } + } + + // Some providers (DeepSeek, Together, …) terminate a tool-call + // turn with finish_reason=tool_calls; others emit + // finish_reason=stop alongside the final tool-call delta. Treat + // any non-null finish_reason as "no more deltas this turn" and + // flush so the playground loop sees the tool call before + // [DONE] / EOF. + if (choice.finish_reason && toolCalls.size > 0) { + yield* flushToolCalls(); } } + + // Stream ended without a [DONE] sentinel (some upstreams just + // close the body). Flush any pending tool calls so we don't drop + // them on the floor. + yield* flushToolCalls(); } // ---------------------------------------------------------------------------