Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .changeset/chat-completion-tool-calls-608.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
"ornn-api": patch
---

NyxLlmClient now normalizes Chat Completions tool-call deltas into the same Responses-API `response.output_item.done` / `function_call` events the playground tool loop already consumes (#608).

Background: #574 routed chat-completion providers to `/chat/completions` and translated text deltas, but the stream parser ignored `choices[].delta.tool_calls`. The playground tool-use loop in `chatService.ts` only matches on Responses-API `response.output_item.done` with `item.type === "function_call"`, so when a chat-completion provider (DeepSeek, Together, any OpenAI-compat gateway) responded with a tool call, no `function_call` event ever reached the loop. The model's `execute_in_sandbox(...)` invocation arrived as plain assistant text and got rendered as JSON in the chat instead of being executed — runtime-based and mixed skills appeared to "respond" without ever running the sandbox.

Fix: `parseChatCompletionStream` keeps a per-index `Map<number, ToolCallAccumulator>` carrying `{id, name, arguments}`. Each `choices[].delta.tool_calls[]` chunk merges into its index buffer (id + name arrive on the first chunk for that call; the `arguments` JSON string accumulates across many chunks). A turn flushes when any of: explicit `finish_reason` (`tool_calls`, `stop`, anything non-null), upstream `[DONE]` sentinel, or stream EOF — whichever fires first. A `flushed` guard makes flush idempotent so we never double-emit if multiple end signals fire.

The synthesized event matches the Responses-API shape `chatService.ts` already validates with Zod (`outputItemDoneEventSchema`):

```js
{
type: "response.output_item.done",
item: {
type: "function_call",
id, call_id, name, arguments, // arguments is the accumulated JSON string
},
}
```

This way zero changes are needed in `chatService.ts` — its existing `pendingToolCall` capture + `executeToolCall` dispatch path works for both upstream formats now.

Parallel tool calls within one assistant turn are supported (one done event per index, emitted in index order). Missing `index` falls back to 0 (treated as a single tool call). Streams that close without `[DONE]` or `finish_reason` still flush at EOF so a buffered call is never lost.

Coverage: 6 new tests appended to `src/clients/nyxid/llm.test.ts` — chunked accumulation + finish_reason flush, EOF flush without [DONE], parallel tool calls, idempotent flush across finish_reason+[DONE], intermixed text+tool deltas (correct event order), missing `index` fallback. All 17 tests in the file green; full ornn-api suite has no new regressions.
208 changes: 208 additions & 0 deletions ornn-api/src/clients/nyxid/llm.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,211 @@ describe("NyxLlmClient.complete() routing on apiFormat", () => {
expect(out).toEqual([]);
});
});

// ---------------------------------------------------------------------------
// #608 — chat-completion tool-call delta normalization
// ---------------------------------------------------------------------------

describe("chat-completion stream tool-call normalization (#608)", () => {
function makeClient(): NyxLlmClient {
return new NyxLlmClient({
resolver: makeResolver({
gatewayUrl: "https://api.example.com",
apiKey: "sk-x",
apiFormat: "chat-completion",
}),
saTokenProvider: STUB_SA_TOKEN,
});
}

it("accumulates tool_calls across chunks and emits one output_item.done on finish_reason=tool_calls", async () => {
fetchHandler = () =>
sseResponse([
JSON.stringify({
choices: [{
delta: {
tool_calls: [{
index: 0,
id: "call_abc",
type: "function",
function: { name: "execute_in_sandbox", arguments: "" },
}],
},
}],
}),
JSON.stringify({
choices: [{
delta: { tool_calls: [{ index: 0, function: { arguments: "{\"scr" } }] },
}],
}),
JSON.stringify({
choices: [{
delta: { tool_calls: [{ index: 0, function: { arguments: "ipt\":\"x\"}" } }] },
}],
}),
JSON.stringify({ choices: [{ delta: {}, finish_reason: "tool_calls" }] }),
]);

const events: ResponsesApiStreamEvent[] = [];
for await (const e of makeClient().stream({
model: "deepseek-v4",
input: [{ role: "user", content: "run x" }],
tools: [{
type: "function",
name: "execute_in_sandbox",
description: "run",
parameters: { type: "object" },
}],
})) events.push(e);

const done = events.filter((e) => e.type === "response.output_item.done");
expect(done).toHaveLength(1);
expect(done[0]).toEqual({
type: "response.output_item.done",
item: {
type: "function_call",
id: "call_abc",
call_id: "call_abc",
name: "execute_in_sandbox",
arguments: "{\"script\":\"x\"}",
},
});
});

it("flushes a buffered tool call when stream ends without [DONE] or finish_reason", async () => {
// Body has no [DONE] sentinel and no finish_reason — just an EOF.
fetchHandler = () =>
new Response(
`data: ${JSON.stringify({
choices: [{
delta: {
tool_calls: [{
index: 0,
id: "call_z",
function: { name: "t", arguments: "{\"a\":1}" },
}],
},
}],
})}\n\n`,
{ status: 200, headers: { "Content-Type": "text/event-stream" } },
);

const events: ResponsesApiStreamEvent[] = [];
for await (const e of makeClient().stream({
model: "m",
input: [{ role: "user", content: "go" }],
})) events.push(e);

const done = events.find((e) => e.type === "response.output_item.done");
expect(done).toEqual({
type: "response.output_item.done",
item: {
type: "function_call",
id: "call_z",
call_id: "call_z",
name: "t",
arguments: "{\"a\":1}",
},
});
});

it("supports parallel tool calls — one done event per index", async () => {
fetchHandler = () =>
sseResponse([
JSON.stringify({
choices: [{
delta: {
tool_calls: [
{ index: 0, id: "call_a", function: { name: "fn_a", arguments: "{}" } },
{ index: 1, id: "call_b", function: { name: "fn_b", arguments: "{}" } },
],
},
}],
}),
JSON.stringify({ choices: [{ delta: {}, finish_reason: "tool_calls" }] }),
]);

const events: ResponsesApiStreamEvent[] = [];
for await (const e of makeClient().stream({
model: "m",
input: [{ role: "user", content: "go" }],
})) events.push(e);

const done = events.filter((e) => e.type === "response.output_item.done");
expect(done).toHaveLength(2);
expect((done[0]!.item as { id: string }).id).toBe("call_a");
expect((done[1]!.item as { id: string }).id).toBe("call_b");
});

it("only flushes once when finish_reason and [DONE] both arrive", async () => {
fetchHandler = () =>
sseResponse([
JSON.stringify({
choices: [{
delta: {
tool_calls: [{ index: 0, id: "call_x", function: { name: "fn", arguments: "{}" } }],
},
finish_reason: "tool_calls",
}],
}),
]);

const events: ResponsesApiStreamEvent[] = [];
for await (const e of makeClient().stream({
model: "m",
input: [{ role: "user", content: "go" }],
})) events.push(e);

const done = events.filter((e) => e.type === "response.output_item.done");
expect(done).toHaveLength(1);
});

it("intermixed text + tool_call deltas produce text-delta then done event in order", async () => {
fetchHandler = () =>
sseResponse([
JSON.stringify({ choices: [{ delta: { content: "thinking…" } }] }),
JSON.stringify({
choices: [{
delta: {
tool_calls: [{ index: 0, id: "call_q", function: { name: "fn", arguments: "{}" } }],
},
}],
}),
JSON.stringify({ choices: [{ delta: {}, finish_reason: "tool_calls" }] }),
]);

const events: ResponsesApiStreamEvent[] = [];
for await (const e of makeClient().stream({
model: "m",
input: [{ role: "user", content: "go" }],
})) events.push(e);

expect(events.map((e) => e.type)).toEqual([
"response.output_text.delta",
"response.output_item.done",
]);
});

it("tool_calls.index missing → falls back to index 0", async () => {
fetchHandler = () =>
sseResponse([
JSON.stringify({
choices: [{
delta: {
tool_calls: [{ id: "call_noix", function: { name: "fn", arguments: "{}" } }],
},
finish_reason: "tool_calls",
}],
}),
]);

const events: ResponsesApiStreamEvent[] = [];
for await (const e of makeClient().stream({
model: "m",
input: [{ role: "user", content: "go" }],
})) events.push(e);

const done = events.find((e) => e.type === "response.output_item.done");
expect((done?.item as { id: string }).id).toBe("call_noix");
});
});
120 changes: 105 additions & 15 deletions ornn-api/src/clients/nyxid/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
* `ResponsesApiStreamEvent`). For chat-completion providers the client
* translates the request body on the way out and normalizes the SSE
* stream / completion payload back into Responses-API event shape on
* the way in, so consumers (skill generation + playground) do not need
* to branch on apiFormat (#574).
*
* Tool-call delta normalization for chat-completion is intentionally
* out of scope here — tracked in #608.
* the way in — both text deltas (#574) and accumulated tool-call
* deltas synthesized into `response.output_item.done` /
* `function_call` events (#608) — so consumers (skill generation +
* playground tool loop) never branch on apiFormat.
*
* Authenticates using a Service Account (SA) token obtained via
* client_credentials grant when the resolved provider has no direct
Expand Down Expand Up @@ -185,21 +184,76 @@ async function* parseResponsesStream(
}

/**
* Parse Chat Completions SSE and translate text deltas into
* Responses-API event shape so consumers stay format-agnostic.
* Internal accumulator for a single OpenAI Chat Completions tool call
* as it streams in. `id` + `name` arrive on the first delta for that
* tool-call index; `arguments` is a JSON string that accumulates
* across many chunks before the model finishes emitting it.
*/
interface ToolCallAccumulator {
id: string;
name: string;
arguments: string;
}

/**
* Parse Chat Completions SSE and translate it into Responses-API
* event shape so downstream consumers (skill generation, playground
* tool-use loop) stay format-agnostic.
*
* For #574, only text deltas (`choices[].delta.content`) are
* translated — emitted as `response.output_text.delta`. Tool-call
* delta normalization is tracked in #608.
* - `choices[].delta.content` chunks → `response.output_text.delta`
* events (#574).
* - `choices[].delta.tool_calls` chunks are buffered per tool-call
* index (id + name + accumulated JSON arguments). When the turn
* completes (explicit `finish_reason` of `"tool_calls"` /
* `"stop"`, upstream `[DONE]`, or EOF), each buffered tool call is
* emitted as a synthesized `response.output_item.done` event with
* `item.type === "function_call"` — the exact shape the playground
* loop already consumes (#608). Without this the playground never
* sees a tool call from chat-completion providers and renders
* `execute_in_sandbox(...)` as plain text instead of running it.
*/
async function* parseChatCompletionStream(
response: Response,
): AsyncIterable<ResponsesApiStreamEvent> {
const toolCalls = new Map<number, ToolCallAccumulator>();
let flushed = false;

function* flushToolCalls(): Generator<ResponsesApiStreamEvent> {
if (flushed) return;
flushed = true;
const indices = [...toolCalls.keys()].sort((a, b) => a - b);
for (const idx of indices) {
const tc = toolCalls.get(idx)!;
yield {
type: "response.output_item.done",
item: {
type: "function_call",
id: tc.id,
call_id: tc.id,
name: tc.name,
arguments: tc.arguments,
},
};
}
}

for await (const { data } of parseSSELines(response)) {
if (data === "[DONE]") return;
if (data === "[DONE]") {
yield* flushToolCalls();
return;
}
let chunk: {
choices?: Array<{
delta?: { content?: string | null };
delta?: {
content?: string | null;
tool_calls?: Array<{
index?: number;
id?: string;
type?: string;
function?: { name?: string; arguments?: string };
}>;
};
finish_reason?: string | null;
}>;
};
try {
Expand All @@ -208,11 +262,47 @@ async function* parseChatCompletionStream(
logger.debug({ data: data.slice(0, 100) }, "Failed to parse chat-completion chunk");
continue;
}
const delta = chunk.choices?.[0]?.delta?.content;
if (typeof delta === "string" && delta.length > 0) {
yield { type: "response.output_text.delta", delta };

const choice = chunk.choices?.[0];
if (!choice) continue;

const text = choice.delta?.content;
if (typeof text === "string" && text.length > 0) {
yield { type: "response.output_text.delta", delta: text };
}

const deltaToolCalls = choice.delta?.tool_calls;
if (Array.isArray(deltaToolCalls)) {
for (const piece of deltaToolCalls) {
// OpenAI uses `index` to disambiguate parallel tool calls
// within one assistant turn. Missing index → assume a single
// tool call at index 0.
const idx = typeof piece.index === "number" ? piece.index : 0;
const acc = toolCalls.get(idx) ?? { id: "", name: "", arguments: "" };
if (piece.id) acc.id = piece.id;
if (piece.function?.name) acc.name = piece.function.name;
if (typeof piece.function?.arguments === "string") {
acc.arguments += piece.function.arguments;
}
toolCalls.set(idx, acc);
}
}

// Some providers (DeepSeek, Together, …) terminate a tool-call
// turn with finish_reason=tool_calls; others emit
// finish_reason=stop alongside the final tool-call delta. Treat
// any non-null finish_reason as "no more deltas this turn" and
// flush so the playground loop sees the tool call before
// [DONE] / EOF.
if (choice.finish_reason && toolCalls.size > 0) {
yield* flushToolCalls();
}
}

// Stream ended without a [DONE] sentinel (some upstreams just
// close the body). Flush any pending tool calls so we don't drop
// them on the floor.
yield* flushToolCalls();
}

// ---------------------------------------------------------------------------
Expand Down
Loading