-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Description
[BUG] Stream mode loses toolCall information and records cumulative textContent in tool calling loops
Suggested Labels
type: bugseverity: criticalarea: observationarea: streamingmodel: openai
π Bug Description
In the internalStream() method, when a tool calling loop occurs, the first Observation has two critical issues:
Issue 1: π΄ toolCall information is completely lost
The first Observation records toolCalls as null instead of the actual tool call information.
Root Cause: When the flatMap operator detects a toolCall, it directly returns the Flux from the second internalStream call without emitting the response containing the toolCall, causing MessageAggregator to miss the toolCall information.
Issue 2: π textContent records cumulative value
The first Observation records a cumulative textContent (containing text from both the first and second model calls) instead of only the output from the first call.
Root Cause: flatMap flattens the Flux returned by the second internalStream directly into the first stream, causing MessageAggregator to aggregate chunks from both calls.
π Steps to Reproduce
Prerequisites
- Spring AI 1.1.0
- Using streaming mode
.stream() - Tool calling configured
Reproduction Steps
-
Create a prompt that requires tool calling:
chatClient.prompt() .user("Check today's weather in Beijing, then tell me what clothes to wear") .stream()
-
Model returns tool call request (first call):
{ "textContent": "I need to call the get_weather tool.", "toolCalls": [ { "id": "call_001", "name": "get_weather", "arguments": "{\"city\":\"Beijing\"}" } ] } -
Execute tool call:
get_weather(city="Beijing")β Returns "Beijing: Sunny, 5Β°C" -
Model returns final advice (second call):
{ "textContent": "Based on the current weather (Sunny, 5Β°C), I recommend wearing a down jacket or thick coat...", "toolCalls": null } -
Observe the recorded data in custom
ObservationHandler
β Expected Behavior
Two model calls should produce two independent Observations, each recording its own data:
// First Observation (logId=1)
{
"textContent": "I need to call the get_weather tool.",
"toolCalls": [
{
"id": "call_001",
"name": "get_weather",
"arguments": "{\"city\":\"Beijing\"}"
}
]
}
// Second Observation (logId=2)
{
"textContent": "Based on the current weather (Sunny, 5Β°C), I recommend wearing a down jacket or thick coat...",
"toolCalls": null
}β Actual Behavior
// First Observation (logId=1) - β INCORRECT
{
"textContent": "I need to call the get_weather tool.Based on the current weather (Sunny, 5Β°C), I recommend wearing a down jacket or thick coat...",
// β Cumulative content from both calls (A + B)
"toolCalls": null // β Lost the toolCall information from the first call!
}
// Second Observation (logId=2) - β
CORRECT
{
"textContent": "Based on the current weather (Sunny, 5Β°C), I recommend wearing a down jacket or thick coat...",
"toolCalls": null
}Critical Issues:
- toolCall information completely lost
- textContent incorrectly cumulative
π Root Cause Analysis
Source Location
org.springframework.ai.openai.OpenAiChatModel.java:366-401
Problem Code
public Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse previousChatResponse) {
return Flux.deferContextual(contextView -> {
// Get streaming response from first model call
Flux<OpenAiApi.ChatCompletionChunk> completionChunks =
this.openAiApi.chatCompletionStream(request, ...);
Flux<ChatResponse> chatResponse = completionChunks
.map(this::chunkToChatCompletion)
.switchMap(...);
// β PROBLEM: flatMap handles tool calling
Flux<ChatResponse> flux = chatResponse.flatMap(response -> {
if (this.toolExecutionEligibilityPredicate.isToolExecutionRequired(...)) {
// Execute tool
ToolExecutionResult toolExecutionResult =
this.toolCallingManager.executeToolCalls(prompt, response);
if (!toolExecutionResult.returnDirect()) {
// β Issue 1: Directly returns Flux from second internalStream
// Does NOT emit the current response (containing toolCall)
return this.internalStream(
new Prompt(toolExecutionResult.conversationHistory(), ...),
response
);
}
} else {
// β
Normal case: emit current response
return Flux.just(response);
}
})
.doOnFinally(s -> observation.stop());
// β Issue 2: MessageAggregator aggregates the flattened flux
// Contains partial chunks from first call + all chunks from second call
return new MessageAggregator().aggregate(flux, observationContext::setResponse);
});
}Detailed Analysis
Issue 1: flatMap does not emit the response containing toolCall
// Actual behavior of flatMap:
chunk1 (no toolCall) β Flux.just(chunk1) β emit chunk1 β
chunk2 (no toolCall) β Flux.just(chunk2) β emit chunk2 β
chunk3 (no toolCall) β Flux.just(chunk3) β emit chunk3 β
...
chunkN (has toolCall) β this.internalStream(...)
β returns Flux[chunkB1, chunkB2, ..., chunkBM]
β β chunkN itself is NOT emitted!
// Result after flatMap flattening:
Flux[chunk1, chunk2, chunk3, ..., chunk(N-1), chunkB1, chunkB2, ..., chunkBM]
// β Chunks from first call (excluding chunkN) β Chunks from second callResult: MessageAggregator cannot access chunkN containing the toolCall, causing toolCall information loss.
Issue 2: MessageAggregator aggregates chunks from both calls
MessageAggregator's aggregation logic:
// doOnNext: Accumulate text from each chunk
textContent.append(chunk1.text); // "I"
textContent.append(chunk2.text); // " need"
textContent.append(chunk3.text); // " to"
...
textContent.append(chunk(N-1).text); // " get_weather tool."
// chunkN not passed, so toolCalls cannot be accumulated
textContent.append(chunkB1.text); // "Based"
textContent.append(chunkB2.text); // " on"
...
textContent.append(chunkBM.text); // "thick coat..."
// doOnComplete: Final result
finalTextContent = "I need to call the get_weather tool.Based on the current weather...thick coat..." // A + B
finalToolCalls = null // Because chunkN was not passedπ‘ Proposed Solutions
Solution 1: Minimal Change - Emit current response before recursion
Approach: Use Flux.concat() in flatMap to emit the response containing toolCall first, then recursively call the second internalStream.
Flux<ChatResponse> flux = chatResponse.flatMap(response -> {
if (this.toolExecutionEligibilityPredicate.isToolExecutionRequired(...)) {
ToolExecutionResult toolExecutionResult =
this.toolCallingManager.executeToolCalls(prompt, response);
if (!toolExecutionResult.returnDirect()) {
// β
Emit current response (containing toolCall) first
return Flux.concat(
Flux.just(response), // Ensure toolCall is passed to MessageAggregator
this.internalStream(...) // Then recursively call
);
}
} else {
return Flux.just(response);
}
})
.doOnFinally(s -> observation.stop());
return new MessageAggregator().aggregate(flux, observationContext::setResponse);Pros:
- β Minimal code change (only one line added)
- β Ensures toolCall is not lost (response is passed to MessageAggregator)
Cons:
- β Still has textContent cumulative issue (outer observation records A + B)
- β Behavior inconsistent with synchronous calls
Use Case: As a quick fix for the critical toolCall loss issue.
Solution 2: Complete Fix - Complete current Observation before recursion (Recommended β)
Approach: Utilize MessageAggregator's callback mechanism to stop the current observation immediately after aggregation completes, ensuring each model call has an independent observation.
public Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse previousChatResponse) {
return Flux.deferContextual(contextView -> {
Flux<ChatResponse> chatResponse = completionChunks
.map(this::chunkToChatCompletion)
.switchMap(...);
// Create current observation
final ChatModelObservationContext observationContext =
ChatModelObservationContext.builder()
.prompt(prompt)
.provider(OpenAiApiConstants.PROVIDER_NAME)
.build();
Observation observation = ChatModelObservationDocumentation.CHAT_MODEL_OPERATION
.observation(...)
.start();
// β
Key change: Complete aggregation and observation before handling toolCall
return new MessageAggregator().aggregate(chatResponse, aggregatedResponse -> {
// Callback after aggregation completes
observationContext.setResponse(aggregatedResponse);
observation.stop(); // β
Immediately complete current observation
})
.flatMap(aggregatedResponse -> {
// Check if tool calling is needed
if (this.toolExecutionEligibilityPredicate.isToolExecutionRequired(...)) {
ToolExecutionResult toolExecutionResult =
this.toolCallingManager.executeToolCalls(prompt, aggregatedResponse);
if (!toolExecutionResult.returnDirect()) {
// β
Current observation completed, start new recursive call
// Recursive call creates a new observation
return this.internalStream(
new Prompt(toolExecutionResult.conversationHistory(), ...),
aggregatedResponse
);
} else {
return Flux.just(aggregatedResponse);
}
} else {
return Flux.just(aggregatedResponse);
}
})
.doOnError(observation::error);
});
}Pros:
- β Completely solves toolCall loss issue (each call recorded independently)
- β Completely solves textContent cumulative issue (each MessageAggregator independent)
- β
Behavior fully consistent with synchronous
internalCall() - β Clear concept: one model call = one complete independent observation
- β Leaves room for future enhancements (can add parentObservationId, loopDepth, etc.)
Cons:
- Requires code structure adjustment (move MessageAggregator before flatMap)
Behavior Comparison:
Synchronous call (internalCall):
Model call 1 β Observation 1 completes β Tool execution β Model call 2 β Observation 2 completes β
Streaming call (current implementation):
Model call 1 starts β Tool execution β Model call 2 β Observation 2 completes β Observation 1 completes (contains 1+2) β
Streaming call (Solution 2 implementation):
Model call 1 β Observation 1 completes β Tool execution β Model call 2 β Observation 2 completes β
Use Case: As a long-term solution to ensure behavioral consistency between streaming and synchronous calls.
π Environment Information
Version Info
- Spring AI: 1.1.0
- Spring Boot: 3.2.x
- Java: 17+
- Model Provider: OpenAI GPT-4 (confirmed)
Dependencies
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.1.0</version>
</dependency>Configuration
spring:
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
options:
model: gpt-4
temperature: 0.7π Related Code and Documentation
Problem Code Locations
- OpenAI Implementation:
org.springframework.ai.openai.OpenAiChatModel.java:271-404 - MessageAggregator:
org.springframework.ai.chat.model.MessageAggregator.java:56-189 - ChatModelObservationContext:
org.springframework.ai.chat.observation.ChatModelObservationContext.java
Related Issues
- This issue may be related to inconsistent behavior between synchronous and streaming calls
- Recommend checking streaming implementations for other model providers (Anthropic, Azure OpenAI, etc.)
π Expectations
We hope the Spring AI team can:
- Confirm the issue: Verify whether this behavior is as expected
- Provide guidance: If this is a design decision, please provide official best practice recommendations
- Fix the issue: If this is a bug, we hope it can be fixed in future releases
- Update documentation: Document the behavioral differences between streaming and synchronous calls in tool calling loops
π Additional Notes
Why This Issue is Critical
-
Inconsistency: Completely different behavior between synchronous (
.call()) and streaming (.stream()) calls- Synchronous: Each model call is an independent Observation β
- Streaming: First Observation accumulates data from subsequent calls β
-
Data Integrity: Loss of toolCall information means:
- Cannot perform complete audit trails
- Cannot debug tool calling failures
- Cannot reproduce problem scenarios
-
Production Impact: In enterprise applications:
- Compliance requirements cannot be met
- SLA monitoring data is unreliable
- Troubleshooting costs significantly increase
Testing Recommendations
We recommend the Spring AI team add the following test cases:
- Streaming + Single Tool Call - Verify toolCall information is correctly recorded
- Streaming + Multiple Tool Calls - Verify behavior with multiple loops
- Streaming + Nested Agents - Verify behavior in complex scenarios
- Observation Data Consistency Tests - Compare observation data between synchronous and streaming modes
Thank you for the Spring AI team's hard work! Looking forward to seeing this issue resolved. π
Submitted by: Song Yang
Contact: [email protected]
Date: 2026-01-01