Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/agent-runtime/src/AgentRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ export class AgentRuntime {
}

return {
content,
content: MessageConverter.consolidateContentBlocks(content),
usage: hasUsage ? { promptTokens, completionTokens, totalTokens: promptTokens + completionTokens } : undefined,
aborted: false as const,
};
Expand Down
50 changes: 48 additions & 2 deletions core/agent-runtime/src/MessageConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,40 @@ export class MessageConverter {
};
}

/**
* Merge adjacent text content blocks into a single block.
* Non-text blocks act as natural boundaries and are never merged.
*/
static consolidateContentBlocks(blocks: MessageContentBlock[]): MessageContentBlock[] {
const result: MessageContentBlock[] = [];
for (const block of blocks) {
const prev = result[result.length - 1];
if (
prev && prev.type === ContentBlockType.Text && block.type === ContentBlockType.Text
) {
const prevText = prev as TextContentBlock;
const curText = block as TextContentBlock;
prevText.text = {
value: prevText.text.value + curText.text.value,
annotations: [...prevText.text.annotations, ...curText.text.annotations],
};
} else if (block.type === ContentBlockType.Text) {
const curText = block as TextContentBlock;
result.push({
...curText,
text: { ...curText.text, annotations: [...curText.text.annotations] },
});
Comment on lines +89 to +92
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The else block assumes that every MessageContentBlock has a text property. While MessageContentBlock is currently only defined as TextContentBlock, the comments and the logic in this function suggest it is intended to handle other types (like images) as natural boundaries. If a non-text block is encountered, accessing block.text.annotations will cause a runtime crash. You should check the block type before attempting to deep-copy the text and annotations properties.

        result.push({
          ...block,
          ...(block.type === ContentBlockType.Text ? {
            text: { ...block.text, annotations: [...block.text.annotations] },
          } : {}),
        });

} else {
result.push({ ...block });
}
}
return result;
}
Comment on lines +70 to +98
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Annotation position indices are not adjusted during text concatenation.

The consolidation logic concatenates text.value strings and merges annotations arrays, but annotations contain position-based indices ({ start, end }) that reference character offsets within their original text block. After merging 'a' + 'b' into 'ab', the second block's annotation { start: 0, end: 1 } still points to offset 0, but it should now point to offset 1 (the position of 'b' in the merged string).

This will cause any downstream consumers that rely on annotation positions (e.g., for highlighting, links, or citations) to reference incorrect character ranges in the consolidated text.

Please verify whether annotation indices are consumed downstream and need adjustment:

#!/bin/bash
# Search for usages of annotations in the codebase
rg -n -C3 'annotations' --type=ts -g '!**/test/**' -g '!**/node_modules/**'
How does the OpenAI Assistants API handle annotation indices when streaming text deltas are consolidated into a final message?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/agent-runtime/src/MessageConverter.ts` around lines 46 - 67, The
consolidateContentBlocks function merges adjacent text blocks but fails to shift
the position-based annotation indices from the concatenated block; update
MessageConverter.consolidateContentBlocks so when merging two
ContentBlockType.Text blocks you compute an offset = prev.text.value.length and
map each annotation from the current block to a new annotation with start+offset
and end+offset (while copying other annotation fields), then append those
shifted annotations to prev.text.annotations; ensure you never mutate original
blocks by creating new objects for prev and for pushed blocks and deep-copy
annotation arrays when creating the non-merged branch.


/**
* Extract MessageObjects and accumulated usage from AgentStreamMessage objects.
* Adjacent text content blocks from streaming chunks are consolidated into
* a single message with merged text, matching the OpenAI Assistants API behavior.
*/
static extractFromStreamMessages(
messages: AgentStreamMessage[],
Expand All @@ -77,14 +109,14 @@ export class MessageConverter {
output: MessageObject[];
usage?: RunUsage;
} {
const output: MessageObject[] = [];
const allBlocks: MessageContentBlock[] = [];
let promptTokens = 0;
let completionTokens = 0;
let hasUsage = false;

for (const msg of messages) {
if (msg.message) {
output.push(MessageConverter.toMessageObject(msg.message, runId));
allBlocks.push(...MessageConverter.toContentBlocks(msg.message));
}
if (msg.usage) {
hasUsage = true;
Expand All @@ -93,6 +125,20 @@ export class MessageConverter {
}
}

const consolidated = MessageConverter.consolidateContentBlocks(allBlocks);

const output: MessageObject[] = consolidated.length > 0
? [{
id: newMsgId(),
object: AgentObjectType.ThreadMessage,
createdAt: nowUnix(),
runId,
role: MessageRole.Assistant,
status: MessageStatus.Completed,
content: consolidated,
}]
: [];

let usage: RunUsage | undefined;
if (hasUsage) {
usage = {
Expand Down
92 changes: 91 additions & 1 deletion core/agent-runtime/test/AgentRuntime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
AgentNotFoundError, AgentConflictError } from '@eggjs/tegg-types/agent-runtime';

import { isTextBlock } from '../src/MessageConverter';
import type { RunRecord, RunObject, CreateRunInput, AgentStreamMessage } from '@eggjs/tegg-types/agent-runtime';
import type { RunRecord, RunObject, CreateRunInput, AgentStreamMessage, MessageObject } from '@eggjs/tegg-types/agent-runtime';

import { AgentRuntime } from '../src/AgentRuntime';
import type { AgentExecutor, AgentRuntimeOptions } from '../src/AgentRuntime';
Expand Down Expand Up @@ -311,6 +311,33 @@ describe('test/AgentRuntime.test.ts', () => {
},
);
});

it('should consolidate multiple streaming chunks into a single message', async () => {
executor.execRun = async function* (): AsyncGenerator<AgentStreamMessage> {
yield { message: { content: 'Hello ' } };
yield { message: { content: 'world' } };
yield { message: { content: '!' } };
yield { usage: { promptTokens: 10, completionTokens: 5 } };
};

const result = await runtime.syncRun({
input: { messages: [{ role: 'user', content: 'Hi' }] },
});

assert.equal(result.status, RunStatus.Completed);
assert.equal(result.output!.length, 1);
assert.equal(result.output![0].content.length, 1);
assert(isTextBlock(result.output![0].content[0]));
assert.equal(result.output![0].content[0].text.value, 'Hello world!');

// Thread history should also have consolidated content
const thread = await runtime.getThread(result.threadId);
const assistantMsg = thread.messages.find(m => m.role === MessageRole.Assistant);
assert.ok(assistantMsg);
assert.equal(assistantMsg!.content.length, 1);
assert(isTextBlock(assistantMsg!.content[0]));
assert.equal(assistantMsg!.content[0].text.value, 'Hello world!');
});
});

describe('asyncRun', () => {
Expand Down Expand Up @@ -366,6 +393,35 @@ describe('test/AgentRuntime.test.ts', () => {
const run = await store.getRun(result.id);
assert.deepStrictEqual(run.metadata, meta);
});

it('should consolidate multiple streaming chunks into a single message', async () => {
executor.execRun = async function* (): AsyncGenerator<AgentStreamMessage> {
yield { message: { content: 'Hello ' } };
yield { message: { content: 'world' } };
yield { message: { content: '!' } };
yield { usage: { promptTokens: 10, completionTokens: 5 } };
};

const result = await runtime.asyncRun({
input: { messages: [{ role: 'user', content: 'Hi' }] },
});

await runtime.waitForPendingTasks();

const run = await store.getRun(result.id);
assert.equal(run.status, RunStatus.Completed);
assert.equal(run.output!.length, 1);
assert.equal(run.output![0].content.length, 1);
assert(isTextBlock(run.output![0].content[0]));
assert.equal(run.output![0].content[0].text.value, 'Hello world!');

const thread = await store.getThread(result.threadId);
const assistantMsg = thread.messages.find(m => m.role === MessageRole.Assistant);
assert.ok(assistantMsg);
assert.equal(assistantMsg!.content.length, 1);
assert(isTextBlock(assistantMsg!.content[0]));
assert.equal(assistantMsg!.content[0].text.value, 'Hello world!');
});
});

describe('streamRun', () => {
Expand Down Expand Up @@ -461,6 +517,40 @@ describe('test/AgentRuntime.test.ts', () => {
assert(eventNames.includes(AgentSSEEvent.Done));
assert(writer.closed);
});

it('should consolidate streaming chunks into single content block in stored message', async () => {
executor.execRun = async function* (): AsyncGenerator<AgentStreamMessage> {
yield { message: { content: 'Hello ' } };
yield { message: { content: 'world' } };
yield { message: { content: '!' } };
yield { usage: { promptTokens: 10, completionTokens: 5 } };
};

const writer = new MockSSEWriter();
await runtime.streamRun({ input: { messages: [{ role: 'user', content: 'Hi' }] } }, writer);

// SSE deltas should still be granular (3 separate delta events)
const deltaEvents = writer.events.filter(e => e.event === AgentSSEEvent.ThreadMessageDelta);
assert.equal(deltaEvents.length, 3);

// But the completed message should have consolidated content
const completedEvent = writer.events.find(e => e.event === AgentSSEEvent.ThreadMessageCompleted);
assert.ok(completedEvent);
const completedMsg = completedEvent.data as MessageObject;
assert.equal(completedMsg.content.length, 1);
assert(isTextBlock(completedMsg.content[0]));
assert.equal(completedMsg.content[0].text.value, 'Hello world!');

// Thread history should also have consolidated content
const runCreatedEvent = writer.events.find(e => e.event === AgentSSEEvent.ThreadRunCreated);
const threadId = (runCreatedEvent!.data as RunObject).threadId;
const thread = await runtime.getThread(threadId);
const assistantMsg = thread.messages.find(m => m.role === MessageRole.Assistant);
assert.ok(assistantMsg);
assert.equal(assistantMsg!.content.length, 1);
assert(isTextBlock(assistantMsg!.content[0]));
assert.equal(assistantMsg!.content[0].text.value, 'Hello world!');
});
});

describe('getRun', () => {
Expand Down
112 changes: 107 additions & 5 deletions core/agent-runtime/test/MessageConverter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,106 @@ describe('test/MessageConverter.test.ts', () => {
});
});

describe('consolidateContentBlocks', () => {
it('should merge adjacent text blocks into one', () => {
const blocks = [
{ type: ContentBlockType.Text, text: { value: 'Hello ', annotations: [] } },
{ type: ContentBlockType.Text, text: { value: 'world', annotations: [] } },
];
const result = MessageConverter.consolidateContentBlocks(blocks);
assert.equal(result.length, 1);
assert(isTextBlock(result[0]));
assert.equal(result[0].text.value, 'Hello world');
});

it('should merge annotations from adjacent text blocks', () => {
const blocks = [
{ type: ContentBlockType.Text, text: { value: 'a', annotations: [{ start: 0, end: 1 }] as any[] } },
{ type: ContentBlockType.Text, text: { value: 'b', annotations: [{ start: 0, end: 1 }] as any[] } },
];
const result = MessageConverter.consolidateContentBlocks(blocks);
assert.equal(result.length, 1);
assert(isTextBlock(result[0]));
assert.equal(result[0].text.value, 'ab');
assert.equal(result[0].text.annotations.length, 2);
});

it('should return empty array for empty input', () => {
assert.deepStrictEqual(MessageConverter.consolidateContentBlocks([]), []);
});

it('should return single block as-is', () => {
const blocks = [
{ type: ContentBlockType.Text, text: { value: 'only', annotations: [] } },
];
const result = MessageConverter.consolidateContentBlocks(blocks);
assert.equal(result.length, 1);
assert(isTextBlock(result[0]));
assert.equal(result[0].text.value, 'only');
});

it('should merge many consecutive text blocks', () => {
const blocks = Array.from({ length: 5 }, (_, i) => ({
type: ContentBlockType.Text,
text: { value: String(i), annotations: [] },
}));
const result = MessageConverter.consolidateContentBlocks(blocks);
assert.equal(result.length, 1);
assert(isTextBlock(result[0]));
assert.equal(result[0].text.value, '01234');
});

it('should not mutate the original blocks', () => {
const blocks = [
{ type: ContentBlockType.Text, text: { value: 'a', annotations: [] } },
{ type: ContentBlockType.Text, text: { value: 'b', annotations: [] } },
];
MessageConverter.consolidateContentBlocks(blocks);
assert.equal(blocks[0].text.value, 'a');
assert.equal(blocks[1].text.value, 'b');
});

it('should preserve non-text blocks as natural boundaries', () => {
const blocks = [
{ type: ContentBlockType.Text, text: { value: 'before ', annotations: [] } },
{ type: ContentBlockType.Text, text: { value: 'tool call', annotations: [] } },
{ type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: { q: 'test' } },
{ type: ContentBlockType.Text, text: { value: 'after tool', annotations: [] } },
] as any;
const result = MessageConverter.consolidateContentBlocks(blocks);
assert.equal(result.length, 3);
assert(isTextBlock(result[0]));
assert.equal(result[0].text.value, 'before tool call');
assert(isToolUseBlock(result[1]));
assert.equal((result[1] as ToolUseContentBlock).name, 'search');
assert(isTextBlock(result[2]));
assert.equal(result[2].text.value, 'after tool');
});

it('should not crash on tool_use blocks (no .text property)', () => {
const blocks = [
{ type: ContentBlockType.ToolUse, id: 'toolu_1', name: 'search', input: {} },
] as any;
const result = MessageConverter.consolidateContentBlocks(blocks);
assert.equal(result.length, 1);
assert(isToolUseBlock(result[0]));
assert.equal((result[0] as ToolUseContentBlock).name, 'search');
});
});

describe('extractFromStreamMessages', () => {
it('should extract messages and accumulate usage', () => {
it('should consolidate streaming chunks into a single message', () => {
const messages: AgentStreamMessage[] = [
{ message: { content: 'chunk1' }, usage: { promptTokens: 10, completionTokens: 5 } },
{ message: { content: 'chunk2' }, usage: { promptTokens: 0, completionTokens: 8 } },
];
const { output, usage } = MessageConverter.extractFromStreamMessages(messages, 'run_1');

assert.equal(output.length, 2);
assert.equal(output.length, 1);
assert.equal(output[0].content.length, 1);
assert(isTextBlock(output[0].content[0]));
assert.equal(output[0].content[0].text.value, 'chunk1');
assert(isTextBlock(output[1].content[0]));
assert.equal(output[1].content[0].text.value, 'chunk2');
assert.equal(output[0].content[0].text.value, 'chunk1chunk2');
assert.equal(output[0].runId, 'run_1');
assert.ok(usage);
assert.equal(usage.promptTokens, 10);
assert.equal(usage.completionTokens, 13);
Expand Down Expand Up @@ -226,6 +313,8 @@ describe('test/MessageConverter.test.ts', () => {
const messages: AgentStreamMessage[] = [{ message: { content: 'data' } }];
const { output, usage } = MessageConverter.extractFromStreamMessages(messages);
assert.equal(output.length, 1);
assert(isTextBlock(output[0].content[0]));
assert.equal(output[0].content[0].text.value, 'data');
assert.equal(usage, undefined);
});

Expand All @@ -242,6 +331,19 @@ describe('test/MessageConverter.test.ts', () => {
assert.equal(output.length, 0);
assert.equal(usage, undefined);
});

it('should consolidate many streaming chunks into single text block', () => {
const messages: AgentStreamMessage[] = [
{ message: { content: 'Hello ' } },
{ message: { content: 'world' } },
{ message: { content: '!' } },
];
const { output } = MessageConverter.extractFromStreamMessages(messages, 'run_1');
assert.equal(output.length, 1);
assert.equal(output[0].content.length, 1);
assert(isTextBlock(output[0].content[0]));
assert.equal(output[0].content[0].text.value, 'Hello world!');
});
});

describe('toInputMessageObjects', () => {
Expand Down
Loading