Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/web/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,30 @@ async fn handle_chat_message(
let tc = schema::ToolCall { id, name, arguments };
run_l1_tool_chain(state, provider, sink.sender, &mut messages, &tools, content, session_id, tc, pending_chain, stop_flag).await;
}
ConsumeResult::ToolCalls(calls) => {
// Model emitted multiple tool calls in this turn. Dispatch all
// but the last inline (sending UI events + appending messages),
// then enter the L1 chain on the last call. The chain's first
// re-prompt will see the full set of leading tool results in
// message history. Mirrors the platform_ingest.rs pattern.
let count = calls.len();
crate::tools::introspect_tool::log_reasoning_event(
&state.config.general.data_dir, session_id,
&serde_json::json!({"type":"inference","result":"tool_calls","count": count}),
None);
tracing::info!(count, "L1 result: ToolCalls (multi-call dispatch)");
match dispatch_leading_tool_calls(state, sink.sender, &mut messages, calls).await {
Some(last_tc) => {
run_l1_tool_chain(state, provider, sink.sender, &mut messages, &tools, content, session_id, last_tc, pending_chain, stop_flag).await;
}
None => {
// ToolCalls with empty Vec — stream_consumer invariant violation.
// Surface explicitly rather than silently dropping the turn.
tracing::error!("L1 result: ToolCalls(empty) — stream_consumer invariant violated");
send_ws(sink.sender, "error", &serde_json::json!({"message": "Internal error: empty tool call set"})).await;
}
}
}
ConsumeResult::Error(e) => {
tracing::error!(error = %e, "L1 result: Error");
send_ws(sink.sender, "error", &serde_json::json!({"message": e})).await;
Expand Down Expand Up @@ -541,7 +565,7 @@ async fn handle_plan_decision(
}

// L1 tool chain handler extracted to ws_l1.rs for governance compliance.
use crate::web::ws_l1::{deliver_reply, run_l1_tool_chain};
use crate::web::ws_l1::{deliver_reply, dispatch_leading_tool_calls, run_l1_tool_chain};


// ReAct loop execution is in crate::web::ws_react.
Expand Down
71 changes: 71 additions & 0 deletions src/web/ws_l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,31 @@ pub async fn run_l1_tool_chain(
tracing::info!(tool = %name, id = %id, iteration = tool_iter + 1, "L1 chain → another ToolCall");
current_tc = schema::ToolCall { id, name, arguments };
}
ConsumeResult::ToolCalls(calls) => {
// Model emitted multiple tool calls in this chain iteration.
// Dispatch all but the last inline (with UI events and
// message appends), then continue the chain using the last
// call as the next current_tc. The next loop iteration will
// execute that last call and re-prompt the model with all
// accumulated tool results.
let count = calls.len();
tracing::info!(count, iteration = tool_iter + 1, "L1 chain → ToolCalls (multi-call dispatch)");
// Track leading tools in chain_tools so stash_chain captures the full set.
chain_tools.extend(
calls.iter().take(count.saturating_sub(1))
.map(|(_, name, args)| (name.clone(), args.clone()))
);
match dispatch_leading_tool_calls(state, sink.sender, messages, calls).await {
Some(last_tc) => current_tc = last_tc,
None => {
// ToolCalls with empty Vec — stream_consumer invariant violation.
// Surface and break the chain rather than spin silently.
tracing::error!(iteration = tool_iter + 1, "L1 chain: ToolCalls(empty) — stream_consumer invariant violated");
send_ws(sink.sender, "error", &serde_json::json!({"message": "Internal error: empty tool call set in chain"})).await;
break;
}
}
}
ConsumeResult::Escalate { objective, plan, planned_turns } => {
tracing::info!(objective = %objective, planned_turns, "L1 chain → Escalate");
stop_flag.store(false, std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -208,6 +233,52 @@ pub async fn run_l1_tool_chain(
send_ws(sender, "done", &serde_json::json!({})).await;
}

/// Execute leading tool calls inline and return the last call for the caller
/// to drive the L1 chain. Used when the model emits multiple tool calls in
/// a single inference turn — the leading calls are dispatched concretely
/// (sending tool_executing/tool_completed UI events and appending assistant
/// tool_call + tool_result messages), then the caller starts the L1 chain
/// using the last call as its entry point. The chain's re-prompt then sees
/// the full set of leading results in the message history.
///
/// Returns `None` only if `calls` is empty, which violates the
/// `stream_consumer::ConsumeResult::ToolCalls` invariant (the variant is
/// constructed only for `len() > 1`). Callers must surface this case
/// explicitly rather than silently dropping the turn.
pub async fn dispatch_leading_tool_calls(
state: &AppState,
sender: &mut futures_util::stream::SplitSink<WebSocket, WsMessage>,
messages: &mut Vec<Message>,
calls: Vec<(String, String, String)>,
) -> Option<schema::ToolCall> {
let mut tcs: Vec<schema::ToolCall> = calls.into_iter()
.map(|(id, name, arguments)| schema::ToolCall { id, name, arguments })
.collect();
let last_tc = tcs.pop()?;
for tc in &tcs {
send_ws(sender, "tool_executing", &serde_json::json!({"name": &tc.name, "id": &tc.id})).await;
let result = crate::web::tool_dispatch::execute_tool_with_state(state, tc).await;
tracing::info!(tool = %tc.name, success = result.success, output_len = result.output.len(), "L1 leading tool: execution complete");
send_ws(sender, "tool_completed", &serde_json::json!({
"id": &tc.id, "name": &tc.name,
"result": &result.output, "success": result.success,
})).await;
messages.push(Message::assistant_tool_call(&tc.id, &tc.name, &tc.arguments));
if result.images.is_empty() {
messages.push(Message::tool_result(&tc.id, &result.output));
} else {
messages.push(Message::tool_result_multipart(&tc.id, &result.output, result.images));
}
}
// Budget enforcement is intentionally deferred to the chain: the
// caller passes `last_tc` to `run_l1_tool_chain`, which calls
// `enforce_context_budget` after each tool result iteration
// (see ws_l1.rs `run_l1_tool_chain` ~ L116). Mirrors the canonical
// pattern in `platform_ingest.rs:192-215` which also does not
// enforce budget during leading-call dispatch.
Some(last_tc)
}

/// Stash a completed tool chain for delayed reinforcement on the next turn.
pub fn stash_chain(
pending_chain: &mut Option<PendingToolChain>,
Expand Down