diff --git a/src/web/ws.rs b/src/web/ws.rs index 64375d11..3c711f19 100644 --- a/src/web/ws.rs +++ b/src/web/ws.rs @@ -362,6 +362,30 @@ async fn handle_chat_message( let tc = schema::ToolCall { id, name, arguments }; run_l1_tool_chain(state, provider, sink.sender, &mut messages, &tools, content, session_id, tc, pending_chain, stop_flag).await; } + ConsumeResult::ToolCalls(calls) => { + // Model emitted multiple tool calls in this turn. Dispatch all + // but the last inline (sending UI events + appending messages), + // then enter the L1 chain on the last call. The chain's first + // re-prompt will see the full set of leading tool results in + // message history. Mirrors the platform_ingest.rs pattern. + let count = calls.len(); + crate::tools::introspect_tool::log_reasoning_event( + &state.config.general.data_dir, session_id, + &serde_json::json!({"type":"inference","result":"tool_calls","count": count}), + None); + tracing::info!(count, "L1 result: ToolCalls (multi-call dispatch)"); + match dispatch_leading_tool_calls(state, sink.sender, &mut messages, calls).await { + Some(last_tc) => { + run_l1_tool_chain(state, provider, sink.sender, &mut messages, &tools, content, session_id, last_tc, pending_chain, stop_flag).await; + } + None => { + // ToolCalls with empty Vec — stream_consumer invariant violation. + // Surface explicitly rather than silently dropping the turn. + tracing::error!("L1 result: ToolCalls(empty) — stream_consumer invariant violated"); + send_ws(sink.sender, "error", &serde_json::json!({"message": "Internal error: empty tool call set"})).await; + } + } + } ConsumeResult::Error(e) => { tracing::error!(error = %e, "L1 result: Error"); send_ws(sink.sender, "error", &serde_json::json!({"message": e})).await; @@ -541,7 +565,7 @@ async fn handle_plan_decision( } // L1 tool chain handler extracted to ws_l1.rs for governance compliance. -use crate::web::ws_l1::{deliver_reply, run_l1_tool_chain}; +use crate::web::ws_l1::{deliver_reply, dispatch_leading_tool_calls, run_l1_tool_chain}; // ReAct loop execution is in crate::web::ws_react. diff --git a/src/web/ws_l1.rs b/src/web/ws_l1.rs index 70ea2bdc..be8e566c 100644 --- a/src/web/ws_l1.rs +++ b/src/web/ws_l1.rs @@ -175,6 +175,31 @@ pub async fn run_l1_tool_chain( tracing::info!(tool = %name, id = %id, iteration = tool_iter + 1, "L1 chain → another ToolCall"); current_tc = schema::ToolCall { id, name, arguments }; } + ConsumeResult::ToolCalls(calls) => { + // Model emitted multiple tool calls in this chain iteration. + // Dispatch all but the last inline (with UI events and + // message appends), then continue the chain using the last + // call as the next current_tc. The next loop iteration will + // execute that last call and re-prompt the model with all + // accumulated tool results. + let count = calls.len(); + tracing::info!(count, iteration = tool_iter + 1, "L1 chain → ToolCalls (multi-call dispatch)"); + // Track leading tools in chain_tools so stash_chain captures the full set. + chain_tools.extend( + calls.iter().take(count.saturating_sub(1)) + .map(|(_, name, args)| (name.clone(), args.clone())) + ); + match dispatch_leading_tool_calls(state, sink.sender, messages, calls).await { + Some(last_tc) => current_tc = last_tc, + None => { + // ToolCalls with empty Vec — stream_consumer invariant violation. + // Surface and break the chain rather than spin silently. + tracing::error!(iteration = tool_iter + 1, "L1 chain: ToolCalls(empty) — stream_consumer invariant violated"); + send_ws(sink.sender, "error", &serde_json::json!({"message": "Internal error: empty tool call set in chain"})).await; + break; + } + } + } ConsumeResult::Escalate { objective, plan, planned_turns } => { tracing::info!(objective = %objective, planned_turns, "L1 chain → Escalate"); stop_flag.store(false, std::sync::atomic::Ordering::Relaxed); @@ -208,6 +233,52 @@ pub async fn run_l1_tool_chain( send_ws(sender, "done", &serde_json::json!({})).await; } +/// Execute leading tool calls inline and return the last call for the caller +/// to drive the L1 chain. Used when the model emits multiple tool calls in +/// a single inference turn — the leading calls are dispatched concretely +/// (sending tool_executing/tool_completed UI events and appending assistant +/// tool_call + tool_result messages), then the caller starts the L1 chain +/// using the last call as its entry point. The chain's re-prompt then sees +/// the full set of leading results in the message history. +/// +/// Returns `None` only if `calls` is empty, which violates the +/// `stream_consumer::ConsumeResult::ToolCalls` invariant (the variant is +/// constructed only for `len() > 1`). Callers must surface this case +/// explicitly rather than silently dropping the turn. +pub async fn dispatch_leading_tool_calls( + state: &AppState, + sender: &mut futures_util::stream::SplitSink, + messages: &mut Vec, + calls: Vec<(String, String, String)>, +) -> Option { + let mut tcs: Vec = calls.into_iter() + .map(|(id, name, arguments)| schema::ToolCall { id, name, arguments }) + .collect(); + let last_tc = tcs.pop()?; + for tc in &tcs { + send_ws(sender, "tool_executing", &serde_json::json!({"name": &tc.name, "id": &tc.id})).await; + let result = crate::web::tool_dispatch::execute_tool_with_state(state, tc).await; + tracing::info!(tool = %tc.name, success = result.success, output_len = result.output.len(), "L1 leading tool: execution complete"); + send_ws(sender, "tool_completed", &serde_json::json!({ + "id": &tc.id, "name": &tc.name, + "result": &result.output, "success": result.success, + })).await; + messages.push(Message::assistant_tool_call(&tc.id, &tc.name, &tc.arguments)); + if result.images.is_empty() { + messages.push(Message::tool_result(&tc.id, &result.output)); + } else { + messages.push(Message::tool_result_multipart(&tc.id, &result.output, result.images)); + } + } + // Budget enforcement is intentionally deferred to the chain: the + // caller passes `last_tc` to `run_l1_tool_chain`, which calls + // `enforce_context_budget` after each tool result iteration + // (see ws_l1.rs `run_l1_tool_chain` ~ L116). Mirrors the canonical + // pattern in `platform_ingest.rs:192-215` which also does not + // enforce budget during leading-call dispatch. + Some(last_tc) +} + /// Stash a completed tool chain for delayed reinforcement on the next turn. pub fn stash_chain( pending_chain: &mut Option,