Skip to content
This repository was archived by the owner on Apr 6, 2026. It is now read-only.

Commit 4306ed6

Browse files
committed
perf(agents): enhance LLM streaming fallback and prompt structures
- Add automatic fallback to ainvoke when astream fails in base agent - Update analysis agent to support multiple tool calls and improved JSON parsing - Refactor all agent prompts for better structure and language compliance - Remove model/provider/api_key parameters from message creation API BREAKING CHANGE: Removed model, provider, and api_key fields from CreateMessageRequest protobuf message, affecting gRPC API consumers
1 parent c78f949 commit 4306ed6

13 files changed

Lines changed: 851 additions & 946 deletions

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,5 @@ logs/
217217
*.lognuclei-templates/
218218

219219
*/nuclei-templates/
220+
221+
MESSAGE_FLOW_DOCUMENTATION.md

agents/core/base_agent.py

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,19 +199,66 @@ async def stream_llm_response(
199199

200200
async def _buffer_llm_chunks(
201201
self,
202-
llm_stream: AsyncGenerator,
203-
min_chunk_size: int = 50
202+
llm: Any,
203+
prompt: Any,
204+
min_chunk_size: Optional[int] = None,
205+
**kwargs
204206
) -> AsyncGenerator[str, None]:
205-
"""Buffer small chunks into larger ones for smoother UI delivery"""
207+
"""
208+
Stream LLM response with automatic fallback to ainvoke if astream fails or is empty,
209+
and buffer small chunks into larger ones for smoother UI delivery.
210+
211+
Args:
212+
llm: LLM instance
213+
prompt: Prompt to send
214+
min_chunk_size: Min characters to buffer (defaults to LlmConfigs.min_chunk_size)
215+
**kwargs: Additional parameters for LLM calls
216+
217+
Yields:
218+
Text chunks
219+
"""
220+
from common.config import configs
221+
if min_chunk_size is None:
222+
min_chunk_size = configs.llm.min_chunk_size
223+
206224
buffer = ""
207-
async for chunk in llm_stream:
208-
text = chunk.content if isinstance(chunk, BaseMessage) else str(chunk)
209-
buffer += text
210-
if len(buffer) >= min_chunk_size:
225+
yielded_any = False
226+
227+
try:
228+
# 1. Try streaming first
229+
async for chunk in llm.astream(prompt, **kwargs):
230+
text = chunk.content if isinstance(chunk, BaseMessage) else str(chunk)
231+
buffer += text
232+
233+
if len(buffer) >= min_chunk_size:
234+
yield buffer
235+
buffer = ""
236+
yielded_any = True
237+
238+
# Flush final buffer
239+
if buffer:
211240
yield buffer
212-
buffer = ""
213-
if buffer:
214-
yield buffer
241+
yielded_any = True
242+
243+
except (Exception, ValueError) as streaming_error:
244+
if yielded_any:
245+
logger.error(f"Streaming failed partial-way for {self.name}: {streaming_error}")
246+
if buffer:
247+
yield buffer
248+
raise streaming_error
249+
250+
logger.warning(f"Streaming failed or was empty for {self.name}: {streaming_error}. Falling back to ainvoke.")
251+
252+
# 2. Fallback to standard invocation if stream provided no content
253+
if not yielded_any:
254+
try:
255+
response = await llm.ainvoke(prompt, **kwargs)
256+
content = response.content if hasattr(response, 'content') else str(response)
257+
if content:
258+
yield content.strip()
259+
except Exception as e:
260+
logger.error(f"Fallback ainvoke also failed for {self.name}: {e}")
261+
raise e
215262

216263
def _run_async(self, coro):
217264
"""

0 commit comments

Comments
 (0)