fix(agent-runtime): consolidate streaming text content blocks for storage#425
fix(agent-runtime): consolidate streaming text content blocks for storage#425jerryliang64 wants to merge 1 commit intomasterfrom
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAgentRuntime and MessageConverter now consolidate streamed message content: streamed MessageContentBlock arrays are accumulated, adjacent text blocks are merged (texts concatenated, annotations combined), and consumers receive a single consolidated content array instead of fragmented blocks. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant AgentRuntime
participant MessageConverter
participant Storage
Client->>AgentRuntime: send stream of AgentStreamMessage chunks
AgentRuntime->>AgentRuntime: accumulate incoming messages & usage
AgentRuntime->>MessageConverter: extractFromStreamMessages(accumulated)
MessageConverter->>MessageConverter: gather all content blocks
MessageConverter->>MessageConverter: consolidateContentBlocks(blocks)
MessageConverter-->>AgentRuntime: return single consolidated ThreadMessage (or none)
AgentRuntime->>Storage: persist ThreadMessageCompleted / update thread history
AgentRuntime-->>Client: emit SSE deltas (per-chunk) and final consolidated completion
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces logic to consolidate adjacent text content blocks into a single block, matching the OpenAI Assistants API behavior for streaming chunks. The implementation adds a consolidateContentBlocks utility to MessageConverter and integrates it into AgentRuntime. Feedback indicates that the merging logic currently fails to adjust annotation offsets for the combined text and lacks proper type checking for non-text blocks, which could lead to incorrect data or runtime crashes.
| if (prev && prev.type === ContentBlockType.Text && block.type === ContentBlockType.Text) { | ||
| prev.text = { | ||
| value: prev.text.value + block.text.value, | ||
| annotations: [...prev.text.annotations, ...block.text.annotations], | ||
| }; |
There was a problem hiding this comment.
When merging adjacent text blocks, the annotations from the subsequent block are appended without adjusting their character offsets. Since these indices (e.g., start, end, or start_index) are relative to the beginning of the text in their original block, they will point to incorrect positions in the newly consolidated text string. You should iterate through the annotations of the subsequent block and increment their indices by the length of the existing text in prev to ensure they remain valid.
| result.push({ | ||
| ...block, | ||
| text: { ...block.text, annotations: [...block.text.annotations] }, | ||
| }); |
There was a problem hiding this comment.
The else block assumes that every MessageContentBlock has a text property. While MessageContentBlock is currently only defined as TextContentBlock, the comments and the logic in this function suggest it is intended to handle other types (like images) as natural boundaries. If a non-text block is encountered, accessing block.text.annotations will cause a runtime crash. You should check the block type before attempting to deep-copy the text and annotations properties.
result.push({
...block,
...(block.type === ContentBlockType.Text ? {
text: { ...block.text, annotations: [...block.text.annotations] },
} : {}),
});There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@core/agent-runtime/src/MessageConverter.ts`:
- Around line 46-67: The consolidateContentBlocks function merges adjacent text
blocks but fails to shift the position-based annotation indices from the
concatenated block; update MessageConverter.consolidateContentBlocks so when
merging two ContentBlockType.Text blocks you compute an offset =
prev.text.value.length and map each annotation from the current block to a new
annotation with start+offset and end+offset (while copying other annotation
fields), then append those shifted annotations to prev.text.annotations; ensure
you never mutate original blocks by creating new objects for prev and for pushed
blocks and deep-copy annotation arrays when creating the non-merged branch.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b5cd8952-699f-4d29-bb27-fef37bc942c1
📒 Files selected for processing (4)
core/agent-runtime/src/AgentRuntime.tscore/agent-runtime/src/MessageConverter.tscore/agent-runtime/test/AgentRuntime.test.tscore/agent-runtime/test/MessageConverter.test.ts
| /** | ||
| * Merge adjacent text content blocks into a single block. | ||
| * Non-text blocks act as natural boundaries and are never merged. | ||
| */ | ||
| static consolidateContentBlocks(blocks: MessageContentBlock[]): MessageContentBlock[] { | ||
| const result: MessageContentBlock[] = []; | ||
| for (const block of blocks) { | ||
| const prev = result[result.length - 1]; | ||
| if (prev && prev.type === ContentBlockType.Text && block.type === ContentBlockType.Text) { | ||
| prev.text = { | ||
| value: prev.text.value + block.text.value, | ||
| annotations: [...prev.text.annotations, ...block.text.annotations], | ||
| }; | ||
| } else { | ||
| result.push({ | ||
| ...block, | ||
| text: { ...block.text, annotations: [...block.text.annotations] }, | ||
| }); | ||
| } | ||
| } | ||
| return result; | ||
| } |
There was a problem hiding this comment.
Annotation position indices are not adjusted during text concatenation.
The consolidation logic concatenates text.value strings and merges annotations arrays, but annotations contain position-based indices ({ start, end }) that reference character offsets within their original text block. After merging 'a' + 'b' into 'ab', the second block's annotation { start: 0, end: 1 } still points to offset 0, but it should now point to offset 1 (the position of 'b' in the merged string).
This will cause any downstream consumers that rely on annotation positions (e.g., for highlighting, links, or citations) to reference incorrect character ranges in the consolidated text.
Please verify whether annotation indices are consumed downstream and need adjustment:
#!/bin/bash
# Search for usages of annotations in the codebase
rg -n -C3 'annotations' --type=ts -g '!**/test/**' -g '!**/node_modules/**'How does the OpenAI Assistants API handle annotation indices when streaming text deltas are consolidated into a final message?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@core/agent-runtime/src/MessageConverter.ts` around lines 46 - 67, The
consolidateContentBlocks function merges adjacent text blocks but fails to shift
the position-based annotation indices from the concatenated block; update
MessageConverter.consolidateContentBlocks so when merging two
ContentBlockType.Text blocks you compute an offset = prev.text.value.length and
map each annotation from the current block to a new annotation with start+offset
and end+offset (while copying other annotation fields), then append those
shifted annotations to prev.text.annotations; ensure you never mutate original
blocks by creating new objects for prev and for pushed blocks and deep-copy
annotation arrays when creating the non-merged branch.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
core/agent-runtime/src/MessageConverter.ts (1)
59-64: Defensive guard for non-text blocks would improve robustness.The doc comment states non-text blocks act as boundaries, but the else branch unconditionally accesses
block.text. Currently safe sinceMessageContentBlock = TextContentBlock, but if other content types (Image, File, etc.) are added later, this will throw.🛡️ Proposed defensive handling
} else { - result.push({ - ...block, - text: { ...block.text, annotations: [ ...block.text.annotations ] }, - }); + if (block.type === ContentBlockType.Text) { + result.push({ + ...block, + text: { ...block.text, annotations: [ ...block.text.annotations ] }, + }); + } else { + // Non-text blocks are preserved as-is + result.push({ ...block }); + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/agent-runtime/src/MessageConverter.ts` around lines 59 - 64, The else branch in MessageConverter where you push { ...block, text: { ...block.text, annotations: [...] } } assumes block.text exists; add a defensive guard to handle non-text MessageContentBlock variants by checking block.type or the presence of block.text before accessing it. If block.text is present (TextContentBlock), keep the existing cloning logic (preserve annotations), otherwise push a shallow clone of block without touching text (or include a safe default like text: undefined) so non-text blocks (e.g., Image/File) act as boundaries without throwing; update the logic around result.push(...) in the MessageConverter handling to reflect this guard.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@core/agent-runtime/src/MessageConverter.ts`:
- Around line 59-64: The else branch in MessageConverter where you push {
...block, text: { ...block.text, annotations: [...] } } assumes block.text
exists; add a defensive guard to handle non-text MessageContentBlock variants by
checking block.type or the presence of block.text before accessing it. If
block.text is present (TextContentBlock), keep the existing cloning logic
(preserve annotations), otherwise push a shallow clone of block without touching
text (or include a safe default like text: undefined) so non-text blocks (e.g.,
Image/File) act as boundaries without throwing; update the logic around
result.push(...) in the MessageConverter handling to reflect this guard.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ed898a2b-a45b-421d-9c09-a4b8ceaae590
📒 Files selected for processing (1)
core/agent-runtime/src/MessageConverter.ts
…rage When an app yields many small streaming chunks via execRun, each chunk was stored as a separate text content block in the thread message. This caused GET /threads/:id to return fragmented content (dozens of tiny text blocks instead of one consolidated block). This fix adds MessageConverter.consolidateContentBlocks() which merges adjacent text blocks into a single block. It is applied in: - consumeStreamMessages (streamRun path): consolidates before storing the completed message, while SSE deltas remain granular - extractFromStreamMessages (syncRun/asyncRun path): consolidates all chunks into a single MessageObject instead of creating one per yield This matches the OpenAI Assistants API behavior where streaming deltas are fine-grained but stored messages have consolidated content. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fdd644a to
4493993
Compare
Summary
execRun, each chunk was stored as a separate text content block in the thread message/threads/:idreturned fragmented content (dozens of tiny text blocks instead of one consolidated block)MessageConverter.consolidateContentBlocks()which merges adjacent text blocks into a single blockstreamRun(viaconsumeStreamMessages) andsyncRun/asyncRun(viaextractFromStreamMessages) pathsTest plan
consolidateContentBlocks: 6 unit tests (merge, annotations, empty, single, many, no-mutation)extractFromStreamMessages: updated existing tests + added streaming consolidation testsyncRun: integration test verifying chunks consolidated into single message in run output and thread historyasyncRun: integration test verifying chunks consolidated into single message in run output and thread historystreamRun: integration test verifying SSE deltas remain granular (3 events) while stored message has consolidated content (1 block)🤖 Generated with Claude Code
Summary by CodeRabbit