diff --git a/Cargo.lock b/Cargo.lock index 3e51ea7da9..1a31bd2efa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3959,7 +3959,7 @@ dependencies = [ [[package]] name = "openfang-api" -version = "0.5.9" +version = "0.5.10" dependencies = [ "argon2", "async-trait", @@ -4001,7 +4001,7 @@ dependencies = [ [[package]] name = "openfang-channels" -version = "0.5.9" +version = "0.5.10" dependencies = [ "aes", "async-trait", @@ -4040,7 +4040,7 @@ dependencies = [ [[package]] name = "openfang-cli" -version = "0.5.9" +version = "0.5.10" dependencies = [ "clap", "clap_complete", @@ -4068,7 +4068,7 @@ dependencies = [ [[package]] name = "openfang-desktop" -version = "0.5.9" +version = "0.5.10" dependencies = [ "axum", "open", @@ -4094,7 +4094,7 @@ dependencies = [ [[package]] name = "openfang-extensions" -version = "0.5.9" +version = "0.5.10" dependencies = [ "aes-gcm", "argon2", @@ -4122,7 +4122,7 @@ dependencies = [ [[package]] name = "openfang-hands" -version = "0.5.9" +version = "0.5.10" dependencies = [ "chrono", "dashmap", @@ -4140,7 +4140,7 @@ dependencies = [ [[package]] name = "openfang-kernel" -version = "0.5.9" +version = "0.5.10" dependencies = [ "async-trait", "chrono", @@ -4179,7 +4179,7 @@ dependencies = [ [[package]] name = "openfang-memory" -version = "0.5.9" +version = "0.5.10" dependencies = [ "async-trait", "chrono", @@ -4199,7 +4199,7 @@ dependencies = [ [[package]] name = "openfang-migrate" -version = "0.5.9" +version = "0.5.10" dependencies = [ "chrono", "dirs 6.0.0", @@ -4218,7 +4218,7 @@ dependencies = [ [[package]] name = "openfang-runtime" -version = "0.5.9" +version = "0.5.10" dependencies = [ "anyhow", "async-trait", @@ -4254,7 +4254,7 @@ dependencies = [ [[package]] name = "openfang-skills" -version = "0.5.9" +version = "0.5.10" dependencies = [ "chrono", "hex", @@ -4277,7 +4277,7 @@ dependencies = [ [[package]] name = "openfang-types" -version = "0.5.9" +version = "0.5.10" dependencies = [ "async-trait", "chrono", @@ -4296,7 +4296,7 @@ dependencies = [ [[package]] name = "openfang-wire" -version = "0.5.9" +version = "0.5.10" dependencies = [ "async-trait", "chrono", @@ -5773,9 +5773,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" -version = "0.103.10" +version = "0.103.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" +checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" dependencies = [ "aws-lc-rs", "ring", @@ -9230,7 +9230,7 @@ checksum = "b9cc00251562a284751c9973bace760d86c0276c471b4be569fe6b068ee97a56" [[package]] name = "xtask" -version = "0.5.9" +version = "0.5.10" [[package]] name = "yoke" diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index 325c5e6364..7fc9e624a7 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -996,6 +996,16 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { } msg } + + fn set_channel_context( + &self, + agent_id: openfang_types::agent::AgentId, + context: openfang_types::ChannelCallbackContext, + ) { + use openfang_runtime::kernel_handle::KernelHandle as _; + self.kernel + .set_channel_context(&agent_id.to_string(), context); + } } /// Parse a trigger pattern string from chat into a `TriggerPattern`. diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index 952fc93263..a2abe2ca99 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -6245,7 +6245,7 @@ pub async fn list_providers(State(state): State>) -> impl IntoResp // Index probe results by provider list position for O(1) lookup let mut probe_map: HashMap = HashMap::with_capacity(local_providers.len()); - for ((idx, _, _), result) in local_providers.iter().zip(probe_results.into_iter()) { + for ((idx, _, _), result) in local_providers.iter().zip(probe_results) { probe_map.insert(*idx, result); } @@ -11213,7 +11213,7 @@ pub async fn comms_events( } // Sort by timestamp descending (newest first) - comms_events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); + comms_events.sort_by_key(|e| std::cmp::Reverse(e.timestamp.clone())); comms_events.truncate(limit); Json(comms_events) diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index 24b668c3d7..a4be6f7bb4 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -317,6 +317,17 @@ pub trait ChannelBridgeHandle: Send + Sync { async fn a2a_agents_text(&self) -> String { "A2A agents not available.".to_string() } + + /// Store the originating channel context for an agent so async callbacks + /// can deliver results back to the correct channel/user later. + /// Default: no-op (channel context tracking not available). + fn set_channel_context( + &self, + agent_id: openfang_types::agent::AgentId, + context: openfang_types::ChannelCallbackContext, + ) { + let _ = (agent_id, context); + } } /// Per-channel rate limiter tracking message timestamps per user. @@ -1018,6 +1029,19 @@ async fn dispatch_message( return; } + // Capture the channel callback context so async tools (e.g. a2a_send_async) can + // deliver their results back to this channel/user later via inject_async_callback. + handle.set_channel_context( + agent_id, + openfang_types::ChannelCallbackContext { + channel_type: ct_str.to_string(), + reply_to_platform_id: message.sender.platform_id.clone(), + reply_to_display_name: message.sender.display_name.clone(), + thread_id: thread_id.map(|t| t.to_string()), + agent_id: agent_id.to_string(), + }, + ); + // Send typing indicator (best-effort) let _ = adapter.send_typing(&message.sender).await; diff --git a/crates/openfang-channels/src/irc.rs b/crates/openfang-channels/src/irc.rs index b05a59f91d..a9b87e46b3 100644 --- a/crates/openfang-channels/src/irc.rs +++ b/crates/openfang-channels/src/irc.rs @@ -326,19 +326,17 @@ impl ChannelAdapter for IrcAdapter { } // RPL_WELCOME (001) — registration complete, join channels - "001" => { - if !joined { - info!("IRC registered as {nick_clone}"); - for ch in &channels_clone { - let join_cmd = format!("JOIN {ch}\r\n"); - if let Err(e) = writer.write_all(join_cmd.as_bytes()).await { - warn!("IRC JOIN send failed: {e}"); - break 'inner true; - } - info!("IRC joining {ch}"); + "001" if !joined => { + info!("IRC registered as {nick_clone}"); + for ch in &channels_clone { + let join_cmd = format!("JOIN {ch}\r\n"); + if let Err(e) = writer.write_all(join_cmd.as_bytes()).await { + warn!("IRC JOIN send failed: {e}"); + break 'inner true; } - joined = true; + info!("IRC joining {ch}"); } + joined = true; } // PRIVMSG — incoming message diff --git a/crates/openfang-cli/src/tui/screens/agents.rs b/crates/openfang-cli/src/tui/screens/agents.rs index 9121dc938f..fe1a92bdc0 100644 --- a/crates/openfang-cli/src/tui/screens/agents.rs +++ b/crates/openfang-cli/src/tui/screens/agents.rs @@ -576,13 +576,11 @@ impl AgentSelectState { KeyCode::Esc => { self.sub = AgentSubScreen::CreateMethod; } - KeyCode::Enter => { - if !self.custom_name.is_empty() { - if self.custom_desc.is_empty() { - self.custom_desc = format!("A custom {} agent", self.custom_name); - } - self.sub = AgentSubScreen::CustomDesc; + KeyCode::Enter if !self.custom_name.is_empty() => { + if self.custom_desc.is_empty() { + self.custom_desc = format!("A custom {} agent", self.custom_name); } + self.sub = AgentSubScreen::CustomDesc; } KeyCode::Char(c) => { self.custom_name.push(c); @@ -641,15 +639,11 @@ impl AgentSelectState { KeyCode::Esc => { self.sub = AgentSubScreen::CustomPrompt; } - KeyCode::Up | KeyCode::Char('k') => { - if self.tool_cursor > 0 { - self.tool_cursor -= 1; - } + KeyCode::Up | KeyCode::Char('k') if self.tool_cursor > 0 => { + self.tool_cursor -= 1; } - KeyCode::Down | KeyCode::Char('j') => { - if self.tool_cursor < TOOL_OPTIONS.len() - 1 { - self.tool_cursor += 1; - } + KeyCode::Down | KeyCode::Char('j') if self.tool_cursor < TOOL_OPTIONS.len() - 1 => { + self.tool_cursor += 1; } KeyCode::Char(' ') => { self.tool_checks[self.tool_cursor] = !self.tool_checks[self.tool_cursor]; @@ -674,21 +668,15 @@ impl AgentSelectState { KeyCode::Esc => { self.sub = AgentSubScreen::CustomTools; } - KeyCode::Up | KeyCode::Char('k') => { - if self.skill_cursor > 0 { - self.skill_cursor -= 1; - } + KeyCode::Up | KeyCode::Char('k') if self.skill_cursor > 0 => { + self.skill_cursor -= 1; } - KeyCode::Down | KeyCode::Char('j') => { - if len > 0 && self.skill_cursor < len - 1 { - self.skill_cursor += 1; - } + KeyCode::Down | KeyCode::Char('j') if len > 0 && self.skill_cursor < len - 1 => { + self.skill_cursor += 1; } - KeyCode::Char(' ') => { - if len > 0 { - let checked = &mut self.available_skills[self.skill_cursor].1; - *checked = !*checked; - } + KeyCode::Char(' ') if len > 0 => { + let checked = &mut self.available_skills[self.skill_cursor].1; + *checked = !*checked; } KeyCode::Enter => { // Advance to MCP server selection @@ -706,21 +694,15 @@ impl AgentSelectState { KeyCode::Esc => { self.sub = AgentSubScreen::CustomSkills; } - KeyCode::Up | KeyCode::Char('k') => { - if self.mcp_cursor > 0 { - self.mcp_cursor -= 1; - } + KeyCode::Up | KeyCode::Char('k') if self.mcp_cursor > 0 => { + self.mcp_cursor -= 1; } - KeyCode::Down | KeyCode::Char('j') => { - if len > 0 && self.mcp_cursor < len - 1 { - self.mcp_cursor += 1; - } + KeyCode::Down | KeyCode::Char('j') if len > 0 && self.mcp_cursor < len - 1 => { + self.mcp_cursor += 1; } - KeyCode::Char(' ') => { - if len > 0 { - let checked = &mut self.available_mcp[self.mcp_cursor].1; - *checked = !*checked; - } + KeyCode::Char(' ') if len > 0 => { + let checked = &mut self.available_mcp[self.mcp_cursor].1; + *checked = !*checked; } KeyCode::Enter => { let toml = self.build_custom_toml(); @@ -737,21 +719,15 @@ impl AgentSelectState { KeyCode::Esc => { self.sub = AgentSubScreen::AgentDetail; } - KeyCode::Up | KeyCode::Char('k') => { - if self.skill_cursor > 0 { - self.skill_cursor -= 1; - } + KeyCode::Up | KeyCode::Char('k') if self.skill_cursor > 0 => { + self.skill_cursor -= 1; } - KeyCode::Down | KeyCode::Char('j') => { - if len > 0 && self.skill_cursor < len - 1 { - self.skill_cursor += 1; - } + KeyCode::Down | KeyCode::Char('j') if len > 0 && self.skill_cursor < len - 1 => { + self.skill_cursor += 1; } - KeyCode::Char(' ') => { - if len > 0 { - let checked = &mut self.available_skills[self.skill_cursor].1; - *checked = !*checked; - } + KeyCode::Char(' ') if len > 0 => { + let checked = &mut self.available_skills[self.skill_cursor].1; + *checked = !*checked; } KeyCode::Enter => { // Save — collect checked skill names (none checked = "all") @@ -780,21 +756,15 @@ impl AgentSelectState { KeyCode::Esc => { self.sub = AgentSubScreen::AgentDetail; } - KeyCode::Up | KeyCode::Char('k') => { - if self.mcp_cursor > 0 { - self.mcp_cursor -= 1; - } + KeyCode::Up | KeyCode::Char('k') if self.mcp_cursor > 0 => { + self.mcp_cursor -= 1; } - KeyCode::Down | KeyCode::Char('j') => { - if len > 0 && self.mcp_cursor < len - 1 { - self.mcp_cursor += 1; - } + KeyCode::Down | KeyCode::Char('j') if len > 0 && self.mcp_cursor < len - 1 => { + self.mcp_cursor += 1; } - KeyCode::Char(' ') => { - if len > 0 { - let checked = &mut self.available_mcp[self.mcp_cursor].1; - *checked = !*checked; - } + KeyCode::Char(' ') if len > 0 => { + let checked = &mut self.available_mcp[self.mcp_cursor].1; + *checked = !*checked; } KeyCode::Enter => { // Save — collect checked server names (none checked = "all") diff --git a/crates/openfang-cli/src/tui/screens/audit.rs b/crates/openfang-cli/src/tui/screens/audit.rs index f7aa453797..2a0430fa74 100644 --- a/crates/openfang-cli/src/tui/screens/audit.rs +++ b/crates/openfang-cli/src/tui/screens/audit.rs @@ -164,19 +164,15 @@ impl AuditState { let total = self.filtered.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.list_state.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.list_state.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = (i + 1) % total; - self.list_state.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = (i + 1) % total; + self.list_state.select(Some(next)); } KeyCode::Char('f') => { self.action_filter = self.action_filter.next(); diff --git a/crates/openfang-cli/src/tui/screens/comms.rs b/crates/openfang-cli/src/tui/screens/comms.rs index b0a5b84b28..4ea61d0b57 100644 --- a/crates/openfang-cli/src/tui/screens/comms.rs +++ b/crates/openfang-cli/src/tui/screens/comms.rs @@ -155,19 +155,19 @@ impl CommsState { self.task_field = 0; } KeyCode::Char('r') => return CommsAction::Refresh, - KeyCode::Up | KeyCode::Char('k') => { - if self.focus == CommsFocus::EventList && !self.events.is_empty() { - let i = self.event_list_state.selected().unwrap_or(0); - let next = if i == 0 { self.events.len() - 1 } else { i - 1 }; - self.event_list_state.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') + if self.focus == CommsFocus::EventList && !self.events.is_empty() => + { + let i = self.event_list_state.selected().unwrap_or(0); + let next = if i == 0 { self.events.len() - 1 } else { i - 1 }; + self.event_list_state.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if self.focus == CommsFocus::EventList && !self.events.is_empty() { - let i = self.event_list_state.selected().unwrap_or(0); - let next = (i + 1) % self.events.len(); - self.event_list_state.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') + if self.focus == CommsFocus::EventList && !self.events.is_empty() => + { + let i = self.event_list_state.selected().unwrap_or(0); + let next = (i + 1) % self.events.len(); + self.event_list_state.select(Some(next)); } _ => {} } @@ -189,18 +189,17 @@ impl CommsState { self.send_field - 1 }; } - KeyCode::Enter => { + KeyCode::Enter if !self.send_from.is_empty() && !self.send_to.is_empty() - && !self.send_msg.is_empty() - { - self.show_send_modal = false; - return CommsAction::SendMessage { - from: self.send_from.clone(), - to: self.send_to.clone(), - msg: self.send_msg.clone(), - }; - } + && !self.send_msg.is_empty() => + { + self.show_send_modal = false; + return CommsAction::SendMessage { + from: self.send_from.clone(), + to: self.send_to.clone(), + msg: self.send_msg.clone(), + }; } KeyCode::Char(c) => match self.send_field { 0 => self.send_from.push(c), @@ -238,15 +237,13 @@ impl CommsState { self.task_field - 1 }; } - KeyCode::Enter => { - if !self.task_title.is_empty() { - self.show_task_modal = false; - return CommsAction::PostTask { - title: self.task_title.clone(), - desc: self.task_desc.clone(), - assign: self.task_assign.clone(), - }; - } + KeyCode::Enter if !self.task_title.is_empty() => { + self.show_task_modal = false; + return CommsAction::PostTask { + title: self.task_title.clone(), + desc: self.task_desc.clone(), + assign: self.task_assign.clone(), + }; } KeyCode::Char(c) => match self.task_field { 0 => self.task_title.push(c), diff --git a/crates/openfang-cli/src/tui/screens/extensions.rs b/crates/openfang-cli/src/tui/screens/extensions.rs index f3f65fe2c5..7ba69a6fba 100644 --- a/crates/openfang-cli/src/tui/screens/extensions.rs +++ b/crates/openfang-cli/src/tui/screens/extensions.rs @@ -152,12 +152,10 @@ impl ExtensionsState { self.sub = ExtSub::Health; return ExtensionsAction::RefreshHealth; } - KeyCode::Char('/') => { - if self.sub == ExtSub::Browse { - self.searching = true; - self.search_query.clear(); - return ExtensionsAction::Continue; - } + KeyCode::Char('/') if self.sub == ExtSub::Browse => { + self.searching = true; + self.search_query.clear(); + return ExtensionsAction::Continue; } _ => {} } @@ -172,19 +170,15 @@ impl ExtensionsState { fn handle_browse(&mut self, key: KeyEvent) -> ExtensionsAction { let total = self.filtered().len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.browse_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.browse_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.browse_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.browse_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.browse_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.browse_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.browse_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.browse_list.select(Some(next)); } KeyCode::Enter => { let filtered = self.filtered(); @@ -222,24 +216,18 @@ impl ExtensionsState { let total = self.installed_list_data().len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.installed_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.installed_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.installed_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.installed_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.installed_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.installed_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.installed_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.installed_list.select(Some(next)); } - KeyCode::Char('d') | KeyCode::Delete => { - if self.installed_list.selected().is_some() { - self.confirm_remove = true; - } + KeyCode::Char('d') | KeyCode::Delete if self.installed_list.selected().is_some() => { + self.confirm_remove = true; } KeyCode::Char('r') => return ExtensionsAction::RefreshAll, _ => {} @@ -250,19 +238,15 @@ impl ExtensionsState { fn handle_health(&mut self, key: KeyEvent) -> ExtensionsAction { let total = self.health_entries.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.health_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.health_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.health_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.health_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.health_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.health_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.health_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.health_list.select(Some(next)); } KeyCode::Char('r') | KeyCode::Enter => { if let Some(sel) = self.health_list.selected() { diff --git a/crates/openfang-cli/src/tui/screens/hands.rs b/crates/openfang-cli/src/tui/screens/hands.rs index b5559c5150..52c0aef2c9 100644 --- a/crates/openfang-cli/src/tui/screens/hands.rs +++ b/crates/openfang-cli/src/tui/screens/hands.rs @@ -109,19 +109,15 @@ impl HandsState { fn handle_marketplace(&mut self, key: KeyEvent) -> HandsAction { let total = self.definitions.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.marketplace_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.marketplace_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.marketplace_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.marketplace_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.marketplace_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.marketplace_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.marketplace_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.marketplace_list.select(Some(next)); } KeyCode::Enter | KeyCode::Char('a') => { if let Some(sel) = self.marketplace_list.selected() { @@ -157,24 +153,18 @@ impl HandsState { let total = self.instances.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.active_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.active_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.active_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.active_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.active_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.active_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.active_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.active_list.select(Some(next)); } - KeyCode::Char('d') | KeyCode::Delete => { - if self.active_list.selected().is_some() { - self.confirm_deactivate = true; - } + KeyCode::Char('d') | KeyCode::Delete if self.active_list.selected().is_some() => { + self.confirm_deactivate = true; } KeyCode::Char('p') => { if let Some(sel) = self.active_list.selected() { diff --git a/crates/openfang-cli/src/tui/screens/init_wizard.rs b/crates/openfang-cli/src/tui/screens/init_wizard.rs index 7f256082ec..544f9121a8 100644 --- a/crates/openfang-cli/src/tui/screens/init_wizard.rs +++ b/crates/openfang-cli/src/tui/screens/init_wizard.rs @@ -966,16 +966,15 @@ pub fn run() -> InitResult { state.step = Step::Provider; } } - KeyCode::Enter => { + KeyCode::Enter if matches!( state.copilot_auth_status, CopilotAuthStatus::WaitingForUser - ) && !state.copilot_verification_uri.is_empty() - { - let _ = openfang_runtime::drivers::copilot::open_verification_url( - &state.copilot_verification_uri, - ); - } + ) && !state.copilot_verification_uri.is_empty() => + { + let _ = openfang_runtime::drivers::copilot::open_verification_url( + &state.copilot_verification_uri, + ); } _ => {} }, @@ -990,41 +989,36 @@ pub fn run() -> InitResult { state.key_test = KeyTestState::Idle; state.step = Step::Provider; } - KeyCode::Enter => { + KeyCode::Enter if !state.api_key_input.is_empty() - && state.key_test == KeyTestState::Idle - { - if let Some(p) = state.provider() { - let _ = crate::dotenv::save_env_key( - p.env_var, - &state.api_key_input, - ); - } - state.key_test = KeyTestState::Testing; - let provider_name = state - .provider() - .map(|p| p.name.to_string()) - .unwrap_or_default(); - let env_var = state - .provider() - .map(|p| p.env_var.to_string()) - .unwrap_or_default(); - let tx = test_tx.clone(); - std::thread::spawn(move || { - let ok = crate::test_api_key(&provider_name, &env_var); - let _ = tx.send(ok); - }); + && state.key_test == KeyTestState::Idle => + { + if let Some(p) = state.provider() { + let _ = crate::dotenv::save_env_key( + p.env_var, + &state.api_key_input, + ); } + state.key_test = KeyTestState::Testing; + let provider_name = state + .provider() + .map(|p| p.name.to_string()) + .unwrap_or_default(); + let env_var = state + .provider() + .map(|p| p.env_var.to_string()) + .unwrap_or_default(); + let tx = test_tx.clone(); + std::thread::spawn(move || { + let ok = crate::test_api_key(&provider_name, &env_var); + let _ = tx.send(ok); + }); } - KeyCode::Char(c) => { - if state.key_test == KeyTestState::Idle { - state.api_key_input.push(c); - } + KeyCode::Char(c) if state.key_test == KeyTestState::Idle => { + state.api_key_input.push(c); } - KeyCode::Backspace => { - if state.key_test == KeyTestState::Idle { - state.api_key_input.pop(); - } + KeyCode::Backspace if state.key_test == KeyTestState::Idle => { + state.api_key_input.pop(); } _ => {} } @@ -1192,11 +1186,10 @@ fn handle_migration_key( _ => {} }, MigrationPhase::Running => {} // ignore keys while running - MigrationPhase::Done => { - if code == KeyCode::Enter { - state.advance_to_provider(); - } + MigrationPhase::Done if code == KeyCode::Enter => { + state.advance_to_provider(); } + MigrationPhase::Done => {} } } diff --git a/crates/openfang-cli/src/tui/screens/logs.rs b/crates/openfang-cli/src/tui/screens/logs.rs index 40b6656a21..64494c7a02 100644 --- a/crates/openfang-cli/src/tui/screens/logs.rs +++ b/crates/openfang-cli/src/tui/screens/logs.rs @@ -211,19 +211,15 @@ impl LogsState { let total = self.filtered.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.list_state.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.list_state.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = (i + 1) % total; - self.list_state.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = (i + 1) % total; + self.list_state.select(Some(next)); } KeyCode::Char('f') => { self.level_filter = self.level_filter.next(); @@ -237,15 +233,11 @@ impl LogsState { self.auto_refresh = !self.auto_refresh; } KeyCode::Char('r') => return LogsAction::Refresh, - KeyCode::End => { - if total > 0 { - self.list_state.select(Some(total - 1)); - } + KeyCode::End if total > 0 => { + self.list_state.select(Some(total - 1)); } - KeyCode::Home => { - if total > 0 { - self.list_state.select(Some(0)); - } + KeyCode::Home if total > 0 => { + self.list_state.select(Some(0)); } _ => {} } diff --git a/crates/openfang-cli/src/tui/screens/memory.rs b/crates/openfang-cli/src/tui/screens/memory.rs index cfa7b2da37..dc9a2d06cf 100644 --- a/crates/openfang-cli/src/tui/screens/memory.rs +++ b/crates/openfang-cli/src/tui/screens/memory.rs @@ -106,19 +106,15 @@ impl MemoryState { fn handle_agent_select(&mut self, key: KeyEvent) -> MemoryAction { let total = self.agents.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.agent_list_state.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.agent_list_state.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.agent_list_state.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.agent_list_state.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.agent_list_state.selected().unwrap_or(0); - let next = (i + 1) % total; - self.agent_list_state.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.agent_list_state.selected().unwrap_or(0); + let next = (i + 1) % total; + self.agent_list_state.select(Some(next)); } KeyCode::Enter => { if let Some(sel) = self.agent_list_state.selected() { @@ -166,19 +162,15 @@ impl MemoryState { self.kv_pairs.clear(); self.selected_agent = None; } - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.kv_list_state.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.kv_list_state.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.kv_list_state.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.kv_list_state.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.kv_list_state.selected().unwrap_or(0); - let next = (i + 1) % total; - self.kv_list_state.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.kv_list_state.selected().unwrap_or(0); + let next = (i + 1) % total; + self.kv_list_state.select(Some(next)); } KeyCode::Char('a') => { self.sub = MemorySub::AddKey; @@ -196,10 +188,8 @@ impl MemoryState { } } } - KeyCode::Char('d') => { - if self.kv_list_state.selected().is_some() { - self.confirm_delete = true; - } + KeyCode::Char('d') if self.kv_list_state.selected().is_some() => { + self.confirm_delete = true; } KeyCode::Char('r') => { if let Some(agent) = &self.selected_agent { diff --git a/crates/openfang-cli/src/tui/screens/peers.rs b/crates/openfang-cli/src/tui/screens/peers.rs index 3d3d8291ed..09354415c7 100644 --- a/crates/openfang-cli/src/tui/screens/peers.rs +++ b/crates/openfang-cli/src/tui/screens/peers.rs @@ -62,19 +62,15 @@ impl PeersState { } let total = self.peers.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.list_state.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.list_state.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = (i + 1) % total; - self.list_state.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = (i + 1) % total; + self.list_state.select(Some(next)); } KeyCode::Char('r') => return PeersAction::Refresh, _ => {} diff --git a/crates/openfang-cli/src/tui/screens/sessions.rs b/crates/openfang-cli/src/tui/screens/sessions.rs index 14bad4b675..091a3f85da 100644 --- a/crates/openfang-cli/src/tui/screens/sessions.rs +++ b/crates/openfang-cli/src/tui/screens/sessions.rs @@ -130,19 +130,15 @@ impl SessionsState { let total = self.filtered.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.list_state.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.list_state.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = (i + 1) % total; - self.list_state.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = (i + 1) % total; + self.list_state.select(Some(next)); } KeyCode::Enter => { if let Some(sel) = self.list_state.selected() { @@ -155,10 +151,8 @@ impl SessionsState { } } } - KeyCode::Char('d') => { - if self.list_state.selected().is_some() { - self.confirm_delete = true; - } + KeyCode::Char('d') if self.list_state.selected().is_some() => { + self.confirm_delete = true; } KeyCode::Char('/') => { self.search_mode = true; diff --git a/crates/openfang-cli/src/tui/screens/settings.rs b/crates/openfang-cli/src/tui/screens/settings.rs index b97e346059..167a204c3a 100644 --- a/crates/openfang-cli/src/tui/screens/settings.rs +++ b/crates/openfang-cli/src/tui/screens/settings.rs @@ -174,21 +174,17 @@ impl SettingsState { fn handle_providers(&mut self, key: KeyEvent) -> SettingsAction { let total = self.providers.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.provider_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.provider_list.select(Some(next)); - self.test_result = None; - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.provider_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.provider_list.select(Some(next)); + self.test_result = None; } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.provider_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.provider_list.select(Some(next)); - self.test_result = None; - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.provider_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.provider_list.select(Some(next)); + self.test_result = None; } KeyCode::Char('e') => { if let Some(sel) = self.provider_list.selected() { @@ -223,19 +219,15 @@ impl SettingsState { fn handle_models(&mut self, key: KeyEvent) -> SettingsAction { let total = self.models.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.model_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.model_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.model_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.model_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.model_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.model_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.model_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.model_list.select(Some(next)); } KeyCode::Char('r') => return SettingsAction::RefreshModels, _ => {} @@ -246,19 +238,15 @@ impl SettingsState { fn handle_tools(&mut self, key: KeyEvent) -> SettingsAction { let total = self.tools.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.tool_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.tool_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.tool_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.tool_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.tool_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.tool_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.tool_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.tool_list.select(Some(next)); } KeyCode::Char('r') => return SettingsAction::RefreshTools, _ => {} diff --git a/crates/openfang-cli/src/tui/screens/skills.rs b/crates/openfang-cli/src/tui/screens/skills.rs index 5494ebaf60..e19930cd36 100644 --- a/crates/openfang-cli/src/tui/screens/skills.rs +++ b/crates/openfang-cli/src/tui/screens/skills.rs @@ -167,24 +167,18 @@ impl SkillsState { let total = self.installed.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.installed_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.installed_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.installed_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.installed_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.installed_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.installed_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.installed_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.installed_list.select(Some(next)); } - KeyCode::Char('u') => { - if self.installed_list.selected().is_some() { - self.confirm_uninstall = true; - } + KeyCode::Char('u') if self.installed_list.selected().is_some() => { + self.confirm_uninstall = true; } KeyCode::Char('r') => return SkillsAction::RefreshInstalled, _ => {} @@ -217,19 +211,15 @@ impl SkillsState { let total = self.clawhub_results.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.clawhub_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.clawhub_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.clawhub_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.clawhub_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.clawhub_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.clawhub_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.clawhub_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.clawhub_list.select(Some(next)); } KeyCode::Char('i') => { if let Some(sel) = self.clawhub_list.selected() { @@ -257,19 +247,15 @@ impl SkillsState { fn handle_mcp(&mut self, key: KeyEvent) -> SkillsAction { let total = self.mcp_servers.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.mcp_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.mcp_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.mcp_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.mcp_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.mcp_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.mcp_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.mcp_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.mcp_list.select(Some(next)); } KeyCode::Char('r') => return SkillsAction::RefreshMcp, _ => {} diff --git a/crates/openfang-cli/src/tui/screens/templates.rs b/crates/openfang-cli/src/tui/screens/templates.rs index eae8faa723..193d3e464d 100644 --- a/crates/openfang-cli/src/tui/screens/templates.rs +++ b/crates/openfang-cli/src/tui/screens/templates.rs @@ -194,19 +194,15 @@ impl TemplatesState { let total = self.filtered.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.list_state.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.list_state.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.list_state.selected().unwrap_or(0); - let next = (i + 1) % total; - self.list_state.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.list_state.selected().unwrap_or(0); + let next = (i + 1) % total; + self.list_state.select(Some(next)); } KeyCode::Enter => { if let Some(sel) = self.list_state.selected() { diff --git a/crates/openfang-cli/src/tui/screens/triggers.rs b/crates/openfang-cli/src/tui/screens/triggers.rs index d55ddb6d55..8aeb093bd6 100644 --- a/crates/openfang-cli/src/tui/screens/triggers.rs +++ b/crates/openfang-cli/src/tui/screens/triggers.rs @@ -156,19 +156,15 @@ impl TriggerState { self.create_step -= 1; } } - KeyCode::Enter => { - if self.create_step < 5 { - self.create_step += 1; - } + KeyCode::Enter if self.create_step < 5 => { + self.create_step += 1; } KeyCode::Char(c) => match self.create_step { 0 => self.create_agent_id.push(c), 2 => self.create_pattern_param.push(c), 3 => self.create_prompt.push(c), - 4 => { - if c.is_ascii_digit() { - self.create_max_fires.push(c); - } + 4 if c.is_ascii_digit() => { + self.create_max_fires.push(c); } _ => {} }, diff --git a/crates/openfang-cli/src/tui/screens/usage.rs b/crates/openfang-cli/src/tui/screens/usage.rs index e0f829cf2c..20b304cb35 100644 --- a/crates/openfang-cli/src/tui/screens/usage.rs +++ b/crates/openfang-cli/src/tui/screens/usage.rs @@ -103,27 +103,22 @@ impl UsageState { } match self.sub { - UsageSub::Summary => { - if key.code == KeyCode::Char('r') { - return UsageAction::Refresh; - } + UsageSub::Summary if key.code == KeyCode::Char('r') => { + return UsageAction::Refresh; } + UsageSub::Summary => {} UsageSub::ByModel => { let total = self.by_model.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.model_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.model_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.model_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.model_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.model_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.model_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.model_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.model_list.select(Some(next)); } KeyCode::Char('r') => return UsageAction::Refresh, _ => {} @@ -132,19 +127,15 @@ impl UsageState { UsageSub::ByAgent => { let total = self.by_agent.len(); match key.code { - KeyCode::Up | KeyCode::Char('k') => { - if total > 0 { - let i = self.agent_list.selected().unwrap_or(0); - let next = if i == 0 { total - 1 } else { i - 1 }; - self.agent_list.select(Some(next)); - } + KeyCode::Up | KeyCode::Char('k') if total > 0 => { + let i = self.agent_list.selected().unwrap_or(0); + let next = if i == 0 { total - 1 } else { i - 1 }; + self.agent_list.select(Some(next)); } - KeyCode::Down | KeyCode::Char('j') => { - if total > 0 { - let i = self.agent_list.selected().unwrap_or(0); - let next = (i + 1) % total; - self.agent_list.select(Some(next)); - } + KeyCode::Down | KeyCode::Char('j') if total > 0 => { + let i = self.agent_list.selected().unwrap_or(0); + let next = (i + 1) % total; + self.agent_list.select(Some(next)); } KeyCode::Char('r') => return UsageAction::Refresh, _ => {} diff --git a/crates/openfang-cli/src/tui/screens/wizard.rs b/crates/openfang-cli/src/tui/screens/wizard.rs index f15b8f8c8d..8b46638469 100644 --- a/crates/openfang-cli/src/tui/screens/wizard.rs +++ b/crates/openfang-cli/src/tui/screens/wizard.rs @@ -322,13 +322,11 @@ impl WizardState { KeyCode::Esc => { self.step = WizardStep::Provider; } - KeyCode::Enter => { - if !self.api_key_input.is_empty() { - if let Some(p) = self.selected_provider_info() { - self.model_input = p.default_model.to_string(); - } - self.step = WizardStep::Model; + KeyCode::Enter if !self.api_key_input.is_empty() => { + if let Some(p) = self.selected_provider_info() { + self.model_input = p.default_model.to_string(); } + self.step = WizardStep::Model; } KeyCode::Char(c) => { self.api_key_input.push(c); diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index fc6c77f0e0..42f28d9d9a 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -161,6 +161,9 @@ pub struct OpenFangKernel { agent_msg_locks: dashmap::DashMap>>, /// Weak self-reference for trigger dispatch (set after Arc wrapping). self_handle: OnceLock>, + /// Channel callback contexts — tracks the originating channel for each agent + /// so async tool results can be delivered back to the correct channel/user. + channel_contexts: dashmap::DashMap, } /// Bounded in-memory delivery receipt tracker. @@ -1158,6 +1161,7 @@ impl OpenFangKernel { default_model_override: std::sync::RwLock::new(None), agent_msg_locks: dashmap::DashMap::new(), self_handle: OnceLock::new(), + channel_contexts: dashmap::DashMap::new(), }; // Restore persisted agents from SQLite @@ -1688,6 +1692,7 @@ impl OpenFangKernel { content_blocks, sender_id, sender_name, + None, // prepend_turns ) .await }; @@ -2121,6 +2126,7 @@ impl OpenFangKernel { ctx_window, Some(&kernel_clone.process_manager), content_blocks, + None, // prepend_turns ) .await; @@ -2377,6 +2383,7 @@ impl OpenFangKernel { content_blocks: Option>, sender_id: Option, sender_name: Option, + prepend_turns: Option>, ) -> KernelResult { // Check metering quota before starting self.metering @@ -2696,6 +2703,7 @@ impl OpenFangKernel { ctx_window, Some(&self.process_manager), content_blocks, + prepend_turns, ) .await .map_err(KernelError::OpenFang)?; @@ -3552,10 +3560,10 @@ impl OpenFangKernel { for req in &def.requires { match req.requirement_type { openfang_hands::RequirementType::ApiKey - | openfang_hands::RequirementType::EnvVar => { - if !req.check_value.is_empty() && !allowed_env.contains(&req.check_value) { - allowed_env.push(req.check_value.clone()); - } + | openfang_hands::RequirementType::EnvVar + if !req.check_value.is_empty() && !allowed_env.contains(&req.check_value) => + { + allowed_env.push(req.check_value.clone()); } _ => {} } @@ -3762,7 +3770,7 @@ impl OpenFangKernel { let mut bindings = self.bindings.lock().unwrap_or_else(|e| e.into_inner()); bindings.push(binding); // Sort by specificity descending - bindings.sort_by(|a, b| b.match_rule.specificity().cmp(&a.match_rule.specificity())); + bindings.sort_by_key(|b| std::cmp::Reverse(b.match_rule.specificity())); } /// Remove a binding by index, returns the removed binding if valid. @@ -6967,6 +6975,107 @@ impl KernelHandle for OpenFangKernel { )) } + fn get_channel_context( + &self, + agent_id: &str, + ) -> Option { + let id: AgentId = agent_id.parse().ok()?; + self.channel_contexts.get(&id).map(|r| r.clone()) + } + + fn set_channel_context(&self, agent_id: &str, context: openfang_types::ChannelCallbackContext) { + if let Ok(id) = agent_id.parse::() { + self.channel_contexts.insert(id, context); + } + } + + async fn inject_async_callback( + &self, + context: openfang_types::ChannelCallbackContext, + agent_name: &str, + result_text: &str, + ) -> Result<(), String> { + use openfang_types::message::{ContentBlock, Message, MessageContent, Role}; + + tracing::info!( + agent_id = %context.agent_id, + agent_name = %agent_name, + channel = %context.channel_type, + recipient = %context.reply_to_platform_id, + "inject_async_callback: delivering async result to channel" + ); + + let agent_id: AgentId = context.agent_id.parse().map_err(|_| { + format!( + "inject_async_callback: invalid agent_id '{}'", + context.agent_id + ) + })?; + let entry = self.registry.get(agent_id).ok_or_else(|| { + format!( + "inject_async_callback: agent {} not found", + context.agent_id + ) + })?; + + // Use a synthetic ToolUse+ToolResult pair to deliver the untrusted remote content + // to the agent. The ToolResult block acts as a structural data boundary — the LLM + // API separates tool results from instructions, preventing the remote agent's output + // from being interpreted as system instructions (prompt injection). + let tool_use_id = format!("a2a_async_{}", uuid::Uuid::new_v4().simple()); + + let prepend_turns = vec![Message { + role: Role::Assistant, + content: MessageContent::Blocks(vec![ContentBlock::ToolUse { + id: tool_use_id.clone(), + name: "_a2a_async_result".to_string(), + input: serde_json::json!({ "source": agent_name }), + provider_metadata: None, + }]), + }]; + + let content_blocks = vec![ContentBlock::ToolResult { + tool_use_id: tool_use_id.clone(), + tool_name: "_a2a_async_result".to_string(), + content: result_text.to_string(), + is_error: false, + }]; + + let kernel_handle: Option> = self + .self_handle + .get() + .and_then(|w| w.upgrade()) + .map(|arc| arc as Arc); + + // Run the agent loop to get a formatted response, then discard the synthetic turns + // from persistent session history by using the messages_before watermark. + let loop_result = self + .execute_llm_agent( + &entry, + agent_id, + "", // no user text — the result is fully in content_blocks + kernel_handle, + Some(content_blocks), + None, // sender_id + Some(agent_name.to_string()), + Some(prepend_turns), + ) + .await + .map_err(|e| format!("inject_async_callback: agent loop failed: {e}"))?; + + // Deliver the agent's formatted response to the originating channel. + self.send_channel_message( + &context.channel_type, + &context.reply_to_platform_id, + &loop_result.response, + context.thread_id.as_deref(), + ) + .await + .map_err(|e| format!("inject_async_callback: channel send failed: {e}"))?; + + Ok(()) + } + async fn spawn_agent_checked( &self, manifest_toml: &str, diff --git a/crates/openfang-migrate/src/openclaw.rs b/crates/openfang-migrate/src/openclaw.rs index f79657899a..483e9e710b 100644 --- a/crates/openfang-migrate/src/openclaw.rs +++ b/crates/openfang-migrate/src/openclaw.rs @@ -896,10 +896,8 @@ fn derive_capabilities(tools: &[String]) -> AgentCapabilities { "shell_exec" => { caps.shell = vec!["*".to_string()]; } - "web_fetch" | "web_search" | "browser_navigate" => { - if caps.network.is_empty() { - caps.network = vec!["*".to_string()]; - } + "web_fetch" | "web_search" | "browser_navigate" if caps.network.is_empty() => { + caps.network = vec!["*".to_string()]; } "agent_send" | "agent_list" => { if caps.agent_message.is_empty() { diff --git a/crates/openfang-runtime/src/a2a.rs b/crates/openfang-runtime/src/a2a.rs index a11a408005..033f75412c 100644 --- a/crates/openfang-runtime/src/a2a.rs +++ b/crates/openfang-runtime/src/a2a.rs @@ -9,10 +9,12 @@ //! - `build_agent_card` — expose OpenFang agents via A2A //! - `A2aClient` — discover and interact with external A2A agents +use futures::StreamExt; use openfang_types::agent::AgentManifest; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::sync::Mutex; +use std::sync::{Mutex, RwLock}; +use std::time::Duration; use tracing::{debug, info, warn}; // --------------------------------------------------------------------------- @@ -393,6 +395,71 @@ pub fn build_agent_card(manifest: &AgentManifest, base_url: &str) -> AgentCard { // A2A Client — discover and interact with external A2A agents // --------------------------------------------------------------------------- +// --------------------------------------------------------------------------- +// SSE parsing +// --------------------------------------------------------------------------- + +/// Outcome of processing a single SSE `data:` payload. +#[derive(Debug)] +pub(crate) enum SseLineOutcome { + /// Line carries no task information — skip and continue. + Skip, + /// Intermediate task update; more events expected. + Update(A2aTask), + /// Final event; stream is complete. + Final(A2aTask), +} + +/// Parse one SSE `data:` payload into a typed outcome. +/// +/// Returns `Err` for malformed JSON or remote error events. +/// All other cases (empty data, unrecognised structure) return `Ok(Skip)`. +/// This function is intentionally pure so it can be unit-tested without HTTP. +pub(crate) fn parse_sse_data_line(data: &str) -> Result { + let data = data.trim(); + if data.is_empty() { + return Ok(SseLineOutcome::Skip); + } + let parsed: serde_json::Value = serde_json::from_str(data) + .map_err(|e| format!("Failed to parse SSE JSON: {e} — data: {data}"))?; + + if let Some(error) = parsed.get("error") { + return Err(format!("A2A SSE error: {error}")); + } + if let Some(result) = parsed.get("result") { + if let Ok(task) = serde_json::from_value::(result.clone()) { + let is_final = result + .get("final") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + return Ok(if is_final { + SseLineOutcome::Final(task) + } else { + SseLineOutcome::Update(task) + }); + } + } + Ok(SseLineOutcome::Skip) +} + +// --------------------------------------------------------------------------- + +/// Append every agent text part from `task` into the shared `progress` buffer. +fn append_agent_text_to_progress(task: &A2aTask, progress: &RwLock) { + if let Ok(mut p) = progress.write() { + for msg in &task.messages { + if msg.role == "agent" { + for part in &msg.parts { + if let A2aPart::Text { text } = part { + p.push_str(text); + p.push('\n'); + } + } + } + } + } +} + /// Client for discovering and interacting with external A2A agents. pub struct A2aClient { client: reqwest::Client, @@ -403,7 +470,7 @@ impl A2aClient { pub fn new() -> Self { Self { client: reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(30)) + .timeout(std::time::Duration::from_secs(300)) .build() .unwrap_or_default(), } @@ -479,6 +546,169 @@ impl A2aClient { } } + /// Send a task to an external A2A agent using SSE streaming (`tasks/sendSubscribe`). + /// + /// Accumulates events from the SSE stream and returns the final task once the + /// server sends a `"final": true` event, the stream closes, or `timeout` elapses. + /// + /// Pass `None` for `timeout` only when the caller manages its own deadline (e.g. + /// the async dispatch path). Synchronous callers should always set a timeout. + pub async fn send_task_streaming( + &self, + url: &str, + message: &str, + session_id: Option<&str>, + timeout: Option, + ) -> Result { + let mut builder = reqwest::Client::builder(); + if let Some(t) = timeout { + builder = builder.timeout(t); + } + let streaming_client = builder + .build() + .map_err(|e| format!("Failed to build streaming client: {e}"))?; + + let request = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "tasks/sendSubscribe", + "params": { + "message": { + "role": "user", + "parts": [{"type": "text", "text": message}] + }, + "sessionId": session_id, + } + }); + + let response = streaming_client + .post(url) + .header("Accept", "text/event-stream") + .json(&request) + .send() + .await + .map_err(|e| format!("A2A send_task_streaming failed: {e}"))?; + + if !response.status().is_success() { + return Err(format!( + "A2A send_task_streaming returned {}", + response.status() + )); + } + + let mut stream = response.bytes_stream(); + let mut buf = String::new(); + let mut last_task: Option = None; + + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|e| format!("SSE stream error: {e}"))?; + let text = + std::str::from_utf8(&chunk).map_err(|e| format!("SSE non-UTF8 data: {e}"))?; + buf.push_str(text); + + while let Some(newline_pos) = buf.find('\n') { + let line = buf[..newline_pos].trim_end_matches('\r').to_string(); + buf = buf[newline_pos + 1..].to_string(); + + if let Some(data) = line.strip_prefix("data: ") { + match parse_sse_data_line(data)? { + SseLineOutcome::Skip => {} + SseLineOutcome::Update(task) => { + last_task = Some(task); + } + SseLineOutcome::Final(task) => { + return Ok(task); + } + } + } + } + } + + // Stream ended without a final event — return whatever we accumulated. + last_task.ok_or_else(|| "SSE stream ended without a final event".to_string()) + } + + /// Send a task using SSE streaming, writing each text chunk into `progress` as it arrives. + /// + /// Identical to `send_task_streaming` but appends each intermediate text chunk to + /// the shared `progress` buffer so callers can read live output via `a2a_check_task`. + pub async fn send_task_streaming_with_progress( + &self, + url: &str, + message: &str, + session_id: Option<&str>, + timeout: Option, + progress: std::sync::Arc>, + ) -> Result { + let mut builder = reqwest::Client::builder(); + if let Some(t) = timeout { + builder = builder.timeout(t); + } + let streaming_client = builder + .build() + .map_err(|e| format!("Failed to build streaming client: {e}"))?; + + let request = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "tasks/sendSubscribe", + "params": { + "message": { + "role": "user", + "parts": [{"type": "text", "text": message}] + }, + "sessionId": session_id, + } + }); + + let response = streaming_client + .post(url) + .header("Accept", "text/event-stream") + .json(&request) + .send() + .await + .map_err(|e| format!("A2A send_task_streaming_with_progress failed: {e}"))?; + + if !response.status().is_success() { + return Err(format!( + "A2A send_task_streaming_with_progress returned {}", + response.status() + )); + } + + let mut stream = response.bytes_stream(); + let mut buf = String::new(); + let mut last_task: Option = None; + + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|e| format!("SSE stream error: {e}"))?; + let text = + std::str::from_utf8(&chunk).map_err(|e| format!("SSE non-UTF8 data: {e}"))?; + buf.push_str(text); + + while let Some(newline_pos) = buf.find('\n') { + let line = buf[..newline_pos].trim_end_matches('\r').to_string(); + buf = buf[newline_pos + 1..].to_string(); + + if let Some(data) = line.strip_prefix("data: ") { + match parse_sse_data_line(data)? { + SseLineOutcome::Skip => {} + SseLineOutcome::Update(task) => { + append_agent_text_to_progress(&task, &progress); + last_task = Some(task); + } + SseLineOutcome::Final(task) => { + append_agent_text_to_progress(&task, &progress); + return Ok(task); + } + } + } + } + } + + last_task.ok_or_else(|| "SSE stream ended without a final event".to_string()) + } + /// Get the status of a task from an external A2A agent. pub async fn get_task(&self, url: &str, task_id: &str) -> Result { let request = serde_json::json!({ @@ -731,6 +961,101 @@ mod tests { assert!(store.len() <= 2); } + // --------------------------------------------------------------------------- + // parse_sse_data_line unit tests + // --------------------------------------------------------------------------- + + fn make_task_json(id: &str, status: &str, is_final: Option) -> String { + let final_field = match is_final { + Some(true) => r#","final":true"#, + Some(false) => r#","final":false"#, + None => "", + }; + format!( + r#"{{"result":{{"id":"{id}","status":"{status}","messages":[],"artifacts":[]{final_field}}}}}"# + ) + } + + #[test] + fn test_parse_sse_empty_data_is_skip() { + assert!(matches!( + parse_sse_data_line("").unwrap(), + SseLineOutcome::Skip + )); + assert!(matches!( + parse_sse_data_line(" ").unwrap(), + SseLineOutcome::Skip + )); + } + + #[test] + fn test_parse_sse_normal_completion_is_final() { + let json = make_task_json("t-1", "completed", Some(true)); + assert!(matches!( + parse_sse_data_line(&json).unwrap(), + SseLineOutcome::Final(_) + )); + } + + #[test] + fn test_parse_sse_final_task_has_correct_id() { + let json = make_task_json("task-abc", "completed", Some(true)); + match parse_sse_data_line(&json).unwrap() { + SseLineOutcome::Final(task) => assert_eq!(task.id, "task-abc"), + other => panic!("Expected Final, got {other:?}"), + } + } + + #[test] + fn test_parse_sse_intermediate_update_is_update() { + let json = make_task_json("t-2", "working", None); + assert!(matches!( + parse_sse_data_line(&json).unwrap(), + SseLineOutcome::Update(_) + )); + } + + #[test] + fn test_parse_sse_explicit_not_final_is_update() { + let json = make_task_json("t-3", "working", Some(false)); + assert!(matches!( + parse_sse_data_line(&json).unwrap(), + SseLineOutcome::Update(_) + )); + } + + #[test] + fn test_parse_sse_malformed_json_is_err() { + assert!(parse_sse_data_line("{not valid json}").is_err()); + } + + #[test] + fn test_parse_sse_error_event_is_err() { + let json = r#"{"error":{"code":-32603,"message":"Internal error"}}"#; + let err = parse_sse_data_line(json).unwrap_err(); + assert!(err.contains("A2A SSE error"), "got: {err}"); + } + + #[test] + fn test_parse_sse_unknown_structure_is_skip() { + // Valid JSON but no "result" or "error" key + let json = r#"{"id":"x","type":"ping"}"#; + assert!(matches!( + parse_sse_data_line(json).unwrap(), + SseLineOutcome::Skip + )); + } + + #[test] + fn test_parse_sse_result_not_a_task_is_skip() { + // "result" exists but isn't deserializable as A2aTask + let json = r#"{"result":"ok"}"#; + assert!(matches!( + parse_sse_data_line(json).unwrap(), + SseLineOutcome::Skip + )); + } + #[test] fn test_a2a_config_serde() { use openfang_types::config::{A2aConfig, ExternalAgent}; diff --git a/crates/openfang-runtime/src/agent_loop.rs b/crates/openfang-runtime/src/agent_loop.rs index 0ac10ae43e..b2550161a7 100644 --- a/crates/openfang-runtime/src/agent_loop.rs +++ b/crates/openfang-runtime/src/agent_loop.rs @@ -95,7 +95,7 @@ fn is_silent_token(text: &str) -> bool { /// Extra guidance injected after failed tool calls to prevent fabricated follow-up actions. const TOOL_ERROR_GUIDANCE: &str = - "[System: One or more tool calls failed. Failed tools did not produce usable data. Do NOT invent missing results, cite nonexistent search results, or pretend failed tools succeeded. If your next steps depend on a failed tool, either retry with a materially different approach or explain the failure to the user and stop. Do not write files, store memory, or take downstream actions based on failed tool outputs.]"; + "[System: One or more tool calls failed. Failed tools did not produce usable data. Do NOT invent missing results, cite nonexistent search results, or pretend failed tools succeeded. If your next steps depend on a failed tool, explain the failure to the user and stop — do not retry. Do not write files, store memory, or take downstream actions based on failed tool outputs.]"; fn append_tool_error_guidance(tool_result_blocks: &mut Vec) { let has_tool_error = tool_result_blocks @@ -216,6 +216,9 @@ pub async fn run_agent_loop( context_window_tokens: Option, process_manager: Option<&crate::process_manager::ProcessManager>, user_content_blocks: Option>, + // Extra turns (e.g. synthetic assistant ToolUse for async A2A callbacks) inserted + // after validate_and_repair and before the current user turn. + prepend_turns: Option>, ) -> OpenFangResult { info!(agent = %manifest.name, "Starting agent loop"); @@ -300,17 +303,11 @@ pub async fn run_agent_loop( system_prompt.push_str(&crate::prompt_builder::build_memory_section(&mem_pairs)); } - // Add the user message to session history. - // When content blocks are provided (e.g. text + image from a channel), - // combine them with the user text so the LLM sees the full multimodal turn. - session - .messages - .push(build_user_turn_message(user_message, user_content_blocks)); - - // Build the messages for the LLM, filtering system messages - // System prompt goes into the separate `system` field. - // NOTE: We build llm_messages BEFORE stripping images so the LLM - // sees the full image data for the current turn. + // Build the messages for the LLM from existing session history (without the current + // user turn). The user turn is appended AFTER validate_and_repair so that any + // ToolResult blocks in user_content_blocks (e.g. async A2A callback results) are + // never seen by the repair pass as orphans — their matching ToolUse arrives via + // prepend_turns which is also inserted after repair. let llm_messages: Vec = session .messages .iter() @@ -318,8 +315,7 @@ pub async fn run_agent_loop( .cloned() .collect(); - // Strip Image blocks from session to prevent base64 bloat. - // The LLM already received them via llm_messages above. + // Strip Image blocks from old turns to prevent base64 bloat accumulating in session. for msg in session.messages.iter_mut() { if let MessageContent::Blocks(blocks) = &mut msg.content { let had_images = blocks @@ -340,6 +336,22 @@ pub async fn run_agent_loop( // Validate and repair session history (drop orphans, merge consecutive) let mut messages = crate::session_repair::validate_and_repair(&llm_messages); + // Insert prepend_turns (e.g. synthetic assistant ToolUse for async callbacks) AFTER + // repair so the repair pass never sees an orphaned ToolUse or ToolResult. + if let Some(turns) = prepend_turns { + for turn in turns { + messages.push(turn.clone()); + session.messages.push(turn); + } + } + + // Add the user message to session history and LLM messages. + // When content blocks are provided (e.g. ToolResult from async callback, or images + // from a channel), combine them with the user text into a single turn. + let user_turn = build_user_turn_message(user_message, user_content_blocks); + messages.push(user_turn.clone()); + session.messages.push(user_turn); + // Inject canonical context as the first user message (not in system prompt) // to keep the system prompt stable across turns for provider prompt caching. if let Some(cc_msg) = manifest @@ -609,6 +621,8 @@ pub async fn run_agent_loop( // Prune NO_REPLY heartbeat turns to save context budget crate::session_repair::prune_heartbeat_turns(&mut session.messages, 10); + // Prune failed tool turns so Jeeves doesn't learn tools are broken + crate::session_repair::prune_failed_tool_turns(&mut session.messages); // Save session memory @@ -665,6 +679,8 @@ pub async fn run_agent_loop( cb(LoopPhase::Done); } + let preview: String = final_response.chars().take(500).collect(); + info!(agent = %manifest.name, response = %preview, "LLM response"); info!( agent = %manifest.name, iterations = iteration + 1, @@ -770,7 +786,9 @@ pub async fn run_agent_loop( _ => {} // Allow or Warn — proceed with execution } - debug!(tool = %tool_call.name, id = %tool_call.id, "Executing tool"); + let input_preview: String = + tool_call.input.to_string().chars().take(300).collect(); + info!(tool = %tool_call.name, id = %tool_call.id, input = %input_preview, "Tool call"); // Notify phase: ToolUse if let Some(cb) = on_phase { @@ -883,6 +901,13 @@ pub async fn run_agent_loop( content }; + let result_preview: String = final_content.chars().take(300).collect(); + info!( + tool = %tool_call.name, + is_error = result.is_error, + result = %result_preview, + "Tool result" + ); tool_result_blocks.push(ContentBlock::ToolResult { tool_use_id: result.tool_use_id, tool_name: tool_call.name.clone(), @@ -1415,6 +1440,8 @@ pub async fn run_agent_loop_streaming( context_window_tokens: Option, process_manager: Option<&crate::process_manager::ProcessManager>, user_content_blocks: Option>, + // Extra turns inserted after validate_and_repair and before the current user turn. + prepend_turns: Option>, ) -> OpenFangResult { info!(agent = %manifest.name, "Starting streaming agent loop"); @@ -1499,13 +1526,8 @@ pub async fn run_agent_loop_streaming( system_prompt.push_str(&crate::prompt_builder::build_memory_section(&mem_pairs)); } - // Add the user message to session history. - // When content blocks are provided (e.g. text + image from a channel), - // combine them with the user text so the LLM sees the full multimodal turn. - session - .messages - .push(build_user_turn_message(user_message, user_content_blocks)); - + // Build LLM messages from existing session history (without the current user turn). + // See run_agent_loop for the rationale. let llm_messages: Vec = session .messages .iter() @@ -1513,8 +1535,7 @@ pub async fn run_agent_loop_streaming( .cloned() .collect(); - // Strip Image blocks from session to prevent base64 bloat. - // The LLM already received them via llm_messages above. + // Strip Image blocks from old turns to prevent base64 bloat. for msg in session.messages.iter_mut() { if let MessageContent::Blocks(blocks) = &mut msg.content { let had_images = blocks @@ -1535,6 +1556,19 @@ pub async fn run_agent_loop_streaming( // Validate and repair session history (drop orphans, merge consecutive) let mut messages = crate::session_repair::validate_and_repair(&llm_messages); + // Insert prepend_turns after repair, before the current user turn. + if let Some(turns) = prepend_turns { + for turn in turns { + messages.push(turn.clone()); + session.messages.push(turn); + } + } + + // Add the user message to session history and LLM messages. + let user_turn = build_user_turn_message(user_message, user_content_blocks); + messages.push(user_turn.clone()); + session.messages.push(user_turn); + // Inject canonical context as the first user message (not in system prompt) // to keep the system prompt stable across turns for provider prompt caching. if let Some(cc_msg) = manifest @@ -1794,6 +1828,8 @@ pub async fn run_agent_loop_streaming( // Prune NO_REPLY heartbeat turns to save context budget crate::session_repair::prune_heartbeat_turns(&mut session.messages, 10); + // Prune failed tool turns so Jeeves doesn't learn tools are broken + crate::session_repair::prune_failed_tool_turns(&mut session.messages); memory .save_session_async(session) @@ -3371,6 +3407,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Loop should complete without error"); @@ -3424,22 +3461,25 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Loop should complete without error"); - let guidance_seen = session.messages.iter().any(|msg| { - match &msg.content { - MessageContent::Blocks(blocks) => blocks.iter().any(|block| { - matches!(block, ContentBlock::Text { text, .. } if text == TOOL_ERROR_GUIDANCE) - }), + // prune_failed_tool_turns removes the failed ToolUse+ToolResult pair from session + // history (including the TOOL_ERROR_GUIDANCE text block that was in the ToolResult + // message). The guidance IS delivered to the LLM during the call, but does not + // persist in session.messages — that's intentional to avoid the agent learning + // that specific tools are broken. + let has_error_tool_result = session.messages.iter().any(|msg| match &msg.content { + MessageContent::Blocks(blocks) => blocks + .iter() + .any(|block| matches!(block, ContentBlock::ToolResult { is_error: true, .. })), _ => false, - } }); - assert!( - guidance_seen, - "Expected tool error guidance in session messages after failed tool call" + !has_error_tool_result, + "Failed tool turns should be pruned from session.messages by prune_failed_tool_turns" ); } @@ -3479,6 +3519,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Loop should complete without error"); @@ -3532,6 +3573,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Loop should complete without error"); @@ -3578,6 +3620,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Streaming loop should complete without error"); @@ -3702,6 +3745,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Loop should recover via retry"); @@ -3749,6 +3793,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Loop should complete with fallback"); @@ -3804,6 +3849,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Streaming loop should complete without error"); @@ -4780,6 +4826,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Agent loop should complete"); @@ -4850,6 +4897,7 @@ mod tests { None, None, None, + None, // prepend_turns ) .await .expect("Agent loop should recover nested XML tool calls"); @@ -4922,6 +4970,7 @@ mod tests { None, None, None, // user_content_blocks + None, // prepend_turns ) .await .expect("Normal loop should complete"); @@ -4985,6 +5034,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // prepend_turns ) .await .expect("Streaming loop should complete"); diff --git a/crates/openfang-runtime/src/drivers/copilot.rs b/crates/openfang-runtime/src/drivers/copilot.rs index 9c1f15f341..980b0acbc5 100644 --- a/crates/openfang-runtime/src/drivers/copilot.rs +++ b/crates/openfang-runtime/src/drivers/copilot.rs @@ -54,10 +54,6 @@ const OAUTH_SCOPES: &str = "copilot"; /// File name for persisted OAuth tokens (inside ~/.openfang/). const TOKEN_FILE_NAME: &str = ".copilot-tokens.json"; -/// Device flow polling interval (seconds) — GitHub default is 5. -#[allow(dead_code)] -const DEVICE_FLOW_POLL_INTERVAL: Duration = Duration::from_secs(5); - /// Maximum time to wait for user to authorize the device flow. const DEVICE_FLOW_TIMEOUT: Duration = Duration::from_secs(900); // 15 minutes @@ -139,8 +135,6 @@ impl CachedCopilotToken { #[derive(Clone)] struct CachedModels { models: Vec, - #[allow(dead_code)] - fetched_at: Instant, } // --------------------------------------------------------------------------- @@ -171,9 +165,6 @@ struct OAuthTokenResponse { #[serde(default)] expires_in: Option, #[serde(default)] - #[allow(dead_code)] - refresh_token_expires_in: Option, - #[serde(default)] error: Option, #[serde(default)] error_description: Option, @@ -608,7 +599,6 @@ impl CopilotDriver { let mut lock = self.models.lock().unwrap_or_else(|e| e.into_inner()); *lock = Some(CachedModels { models: models.clone(), - fetched_at: Instant::now(), }); Ok(models) } @@ -774,20 +764,6 @@ pub async fn run_device_flow(openfang_dir: &Path) -> Result Result { - use std::io::{self, BufRead, Write}; - print!("{prompt}"); - io::stdout().flush().map_err(|e| format!("IO error: {e}"))?; - let mut line = String::new(); - io::stdin() - .lock() - .read_line(&mut line) - .map_err(|e| format!("Failed to read input: {e}"))?; - Ok(line.trim().to_string()) -} - /// Try to open the verification URL in the default browser. pub fn open_verification_url(url: &str) -> bool { #[cfg(target_os = "macos")] diff --git a/crates/openfang-runtime/src/drivers/gemini.rs b/crates/openfang-runtime/src/drivers/gemini.rs index 3a1b9b964a..cac4c3a327 100644 --- a/crates/openfang-runtime/src/drivers/gemini.rs +++ b/crates/openfang-runtime/src/drivers/gemini.rs @@ -985,14 +985,14 @@ impl LlmDriver for GeminiDriver { thought_signature.clone(), )); } - GeminiPart::Thought { ref text, .. } => { - if !text.is_empty() { - let _ = tx - .send(StreamEvent::ThinkingDelta { - text: text.clone(), - }) - .await; - } + GeminiPart::Thought { ref text, .. } + if !text.is_empty() => + { + let _ = tx + .send(StreamEvent::ThinkingDelta { + text: text.clone(), + }) + .await; } _ => {} } diff --git a/crates/openfang-runtime/src/drivers/openai.rs b/crates/openfang-runtime/src/drivers/openai.rs index 2c3edc8cd2..967c26b9a8 100644 --- a/crates/openfang-runtime/src/drivers/openai.rs +++ b/crates/openfang-runtime/src/drivers/openai.rs @@ -308,16 +308,14 @@ impl LlmDriver for OpenAIDriver { // Convert messages for msg in &request.messages { match (&msg.role, &msg.content) { - (Role::System, MessageContent::Text(text)) => { - if request.system.is_none() { - oai_messages.push(OaiMessage { - role: "system".to_string(), - content: Some(OaiMessageContent::Text(text.clone())), - tool_calls: None, - tool_call_id: None, - reasoning_content: None, - }); - } + (Role::System, MessageContent::Text(text)) if request.system.is_none() => { + oai_messages.push(OaiMessage { + role: "system".to_string(), + content: Some(OaiMessageContent::Text(text.clone())), + tool_calls: None, + tool_call_id: None, + reasoning_content: None, + }); } (Role::User, MessageContent::Text(text)) => { oai_messages.push(OaiMessage { @@ -793,16 +791,14 @@ impl LlmDriver for OpenAIDriver { for msg in &request.messages { match (&msg.role, &msg.content) { - (Role::System, MessageContent::Text(text)) => { - if request.system.is_none() { - oai_messages.push(OaiMessage { - role: "system".to_string(), - content: Some(OaiMessageContent::Text(text.clone())), - tool_calls: None, - tool_call_id: None, - reasoning_content: None, - }); - } + (Role::System, MessageContent::Text(text)) if request.system.is_none() => { + oai_messages.push(OaiMessage { + role: "system".to_string(), + content: Some(OaiMessageContent::Text(text.clone())), + tool_calls: None, + tool_call_id: None, + reasoning_content: None, + }); } (Role::User, MessageContent::Text(text)) => { oai_messages.push(OaiMessage { diff --git a/crates/openfang-runtime/src/kernel_handle.rs b/crates/openfang-runtime/src/kernel_handle.rs index e3e1b7633c..f238f0b99f 100644 --- a/crates/openfang-runtime/src/kernel_handle.rs +++ b/crates/openfang-runtime/src/kernel_handle.rs @@ -183,6 +183,31 @@ pub trait KernelHandle: Send + Sync { None } + /// Get the channel callback context for an agent (if one is active). + fn get_channel_context( + &self, + agent_id: &str, + ) -> Option { + let _ = agent_id; + None + } + + /// Store a channel callback context for an agent. + fn set_channel_context(&self, agent_id: &str, context: openfang_types::ChannelCallbackContext) { + let _ = (agent_id, context); + } + + /// Inject an async callback result into a channel, bypassing the normal agent loop. + async fn inject_async_callback( + &self, + context: openfang_types::ChannelCallbackContext, + agent_name: &str, + result_text: &str, + ) -> Result<(), String> { + let _ = (context, agent_name, result_text); + Err("Async callback injection not available".to_string()) + } + /// Send a message to a user on a named channel adapter (e.g., "email", "telegram"). /// When `thread_id` is provided, the message is sent as a thread reply. /// Returns a confirmation string on success. diff --git a/crates/openfang-runtime/src/session_repair.rs b/crates/openfang-runtime/src/session_repair.rs index 1038be1ba4..9890a139ed 100644 --- a/crates/openfang-runtime/src/session_repair.rs +++ b/crates/openfang-runtime/src/session_repair.rs @@ -331,7 +331,7 @@ fn reorder_tool_results(messages: &mut Vec) -> usize { // Insert in reverse order so indices remain valid let mut sorted_insertions: Vec<(usize, Vec)> = insertions.into_iter().collect(); - sorted_insertions.sort_by(|a, b| b.0.cmp(&a.0)); + sorted_insertions.sort_by_key(|k| std::cmp::Reverse(k.0)); for (orig_assistant_idx, blocks) in sorted_insertions { if let Some(¤t_idx) = current_assistant_positions.get(&orig_assistant_idx) { @@ -433,7 +433,7 @@ fn insert_synthetic_results(messages: &mut Vec) -> usize { // Insert in reverse order so indices stay valid let mut sorted: Vec<(usize, Vec)> = grouped.into_iter().collect(); - sorted.sort_by(|a, b| b.0.cmp(&a.0)); + sorted.sort_by_key(|k| std::cmp::Reverse(k.0)); for (assistant_idx, blocks) in sorted { let insert_pos = assistant_idx + 1; @@ -1438,3 +1438,47 @@ mod tests { assert_eq!(messages.len(), 4); } } + +/// Remove assistant+user message pairs where every tool result is an error. +/// +/// When all tool calls in a turn fail, Jeeves should not retain that pair in +/// long-term session memory — next session the tool might work fine. A pair is +/// only pruned when ALL tool results in the user message are `is_error: true`. +/// Mixed results (some success, some failure) are kept intact. +pub fn prune_failed_tool_turns(messages: &mut Vec) { + let mut i = 0; + while i + 1 < messages.len() { + let is_assistant_tool_use = messages[i].role == Role::Assistant + && match &messages[i].content { + MessageContent::Blocks(blocks) => blocks + .iter() + .any(|b| matches!(b, ContentBlock::ToolUse { .. })), + _ => false, + }; + if !is_assistant_tool_use { + i += 1; + continue; + } + let all_errors = messages[i + 1].role == Role::User + && match &messages[i + 1].content { + MessageContent::Blocks(blocks) => { + let results: Vec<_> = blocks + .iter() + .filter(|b| matches!(b, ContentBlock::ToolResult { .. })) + .collect(); + !results.is_empty() + && results + .iter() + .all(|b| matches!(b, ContentBlock::ToolResult { is_error: true, .. })) + } + _ => false, + }; + if all_errors { + debug!("Pruning failed tool turn at index {i}"); + messages.remove(i + 1); + messages.remove(i); + } else { + i += 1; + } + } +} diff --git a/crates/openfang-runtime/src/tool_runner.rs b/crates/openfang-runtime/src/tool_runner.rs index 426637a7ef..c2b8880e00 100644 --- a/crates/openfang-runtime/src/tool_runner.rs +++ b/crates/openfang-runtime/src/tool_runner.rs @@ -17,6 +17,62 @@ use tracing::{debug, warn}; /// Maximum inter-agent call depth to prevent infinite recursion (A->B->C->...). const MAX_AGENT_CALL_DEPTH: u32 = 5; +/// Maximum concurrent async A2A tasks (prevents unbounded memory growth). +const MAX_CONCURRENT_ASYNC_TASKS: usize = 256; +/// TTL for async tasks: handles are aborted and maps cleaned after this duration. +const ASYNC_TASK_TTL: std::time::Duration = std::time::Duration::from_secs(7200); + +/// Entry in `ASYNC_TASKS` — join handle plus insertion timestamp for TTL eviction. +struct AsyncTaskEntry { + handle: tokio::task::JoinHandle<()>, + inserted_at: std::time::Instant, +} + +/// RAII guard that removes a task from both global maps when dropped (including on panic). +struct TaskCleanupGuard(String); + +impl Drop for TaskCleanupGuard { + fn drop(&mut self) { + ASYNC_TASKS.remove(&self.0); + A2A_TASK_PROGRESS.remove(&self.0); + } +} + +/// In-flight async agent tasks, keyed by a caller-chosen task ID. +static ASYNC_TASKS: std::sync::LazyLock> = + std::sync::LazyLock::new(dashmap::DashMap::new); + +/// Live accumulated progress from async A2A tasks, keyed by task ID. +static A2A_TASK_PROGRESS: std::sync::LazyLock< + dashmap::DashMap>>, +> = std::sync::LazyLock::new(dashmap::DashMap::new); + +/// Ensures the background TTL cleanup task is spawned exactly once. +static CLEANUP_TASK_INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + +/// Spawn the background TTL sweep if not already running. +/// +/// Must be called from an async context while a Tokio runtime is active. +fn ensure_cleanup_task() { + CLEANUP_TASK_INIT.get_or_init(|| { + tokio::spawn(async { + loop { + tokio::time::sleep(std::time::Duration::from_secs(600)).await; + let now = std::time::Instant::now(); + ASYNC_TASKS.retain(|_, entry| { + if now.duration_since(entry.inserted_at) >= ASYNC_TASK_TTL { + entry.handle.abort(); + false + } else { + true + } + }); + // Prune orphaned progress entries whose task no longer exists. + A2A_TASK_PROGRESS.retain(|id, _| ASYNC_TASKS.contains_key(id)); + } + }); + }); +} /// Check if a tool name refers to a shell execution tool. /// @@ -373,6 +429,9 @@ pub async fn execute_tool( // A2A outbound tools (cross-instance agent communication) "a2a_discover" => tool_a2a_discover(input).await, "a2a_send" => tool_a2a_send(input, kernel).await, + "a2a_send_async" => tool_a2a_send_async(input, kernel, caller_agent_id).await, + "a2a_check_task" => tool_a2a_check_task(input), + "a2a_cancel_task" => tool_a2a_cancel_task(input), // Browser automation tools "browser_navigate" => { @@ -1145,7 +1204,7 @@ pub fn builtin_tool_definitions() -> Vec { }, ToolDefinition { name: "a2a_send".to_string(), - description: "Send a task/message to an external A2A agent and get the response. Use agent_name to send to a previously discovered agent, or agent_url for direct addressing.".to_string(), + description: "Send a task/message to an external A2A agent and get the response synchronously. Use for quick tasks expected to complete in <30s. Use agent_name to send to a previously discovered agent, or agent_url for direct addressing.".to_string(), input_schema: serde_json::json!({ "type": "object", "properties": { @@ -1157,6 +1216,43 @@ pub fn builtin_tool_definitions() -> Vec { "required": ["message"] }), }, + ToolDefinition { + name: "a2a_send_async".to_string(), + description: "Send a task to an external A2A agent asynchronously. Returns immediately with a task_id. The result is delivered back to the channel when the remote agent finishes. Use for long-running tasks (implement a feature, run tests, etc.). Use a2a_check_task to poll live progress.".to_string(), + input_schema: serde_json::json!({ + "type": "object", + "properties": { + "message": { "type": "string", "description": "The task/message to send to the remote agent" }, + "agent_url": { "type": "string", "description": "Direct URL of the remote agent's A2A endpoint" }, + "agent_name": { "type": "string", "description": "Name of a previously discovered A2A agent (looked up from kernel)" }, + "session_id": { "type": "string", "description": "Optional session ID for multi-turn conversations" }, + "task_id": { "type": "string", "description": "A unique ID for this task (for polling/cancellation). Auto-generated if omitted." } + }, + "required": ["message"] + }), + }, + ToolDefinition { + name: "a2a_check_task".to_string(), + description: "Check the live accumulated output from a running async A2A task. Returns whatever the remote agent has produced so far.".to_string(), + input_schema: serde_json::json!({ + "type": "object", + "properties": { + "task_id": { "type": "string", "description": "The task ID returned by a2a_send_async" } + }, + "required": ["task_id"] + }), + }, + ToolDefinition { + name: "a2a_cancel_task".to_string(), + description: "Cancel a running async A2A task by its task ID.".to_string(), + input_schema: serde_json::json!({ + "type": "object", + "properties": { + "task_id": { "type": "string", "description": "The task ID to cancel" } + }, + "required": ["task_id"] + }), + }, // --- TTS/STT tools --- ToolDefinition { name: "text_to_speech".to_string(), @@ -1629,9 +1725,11 @@ async fn tool_agent_send( kernel: Option<&Arc>, ) -> Result { let kh = require_kernel(kernel)?; - let agent_id = input["agent_id"] + let agent_id_raw = input["agent_id"] .as_str() .ok_or("Missing 'agent_id' parameter")?; + // Strip hallucinated @ (e.g. @default) — agent IDs never have @ qualifiers + let agent_id = agent_id_raw.split('@').next().unwrap_or(agent_id_raw); let message = input["message"] .as_str() .ok_or("Missing 'message' parameter")?; @@ -2580,11 +2678,175 @@ async fn tool_a2a_send( let session_id = input["session_id"].as_str(); let client = crate::a2a::A2aClient::new(); - let task = client.send_task(&url, message, session_id).await?; + let task = client + .send_task_streaming( + &url, + message, + session_id, + Some(std::time::Duration::from_secs(300)), + ) + .await?; serde_json::to_string_pretty(&task).map_err(|e| format!("Serialization error: {e}")) } +/// Fire an A2A task in the background and return a task_id immediately. +/// +/// Progress is accumulated in `A2A_TASK_PROGRESS` as SSE chunks arrive. +/// On completion the final result is delivered via `inject_async_callback` +/// to the originating channel. +async fn tool_a2a_send_async( + input: &serde_json::Value, + kernel: Option<&Arc>, + caller_agent_id: Option<&str>, +) -> Result { + let kh = require_kernel(kernel)?.clone(); + let message = input["message"] + .as_str() + .ok_or("Missing 'message' parameter")? + .to_string(); + + // Resolve agent URL + let url = if let Some(url) = input["agent_url"].as_str() { + if crate::web_fetch::check_ssrf(url, &[]).is_err() { + return Err("SSRF blocked: URL resolves to a private or metadata address".to_string()); + } + url.to_string() + } else if let Some(name) = input["agent_name"].as_str() { + kh.get_a2a_agent_url(name) + .ok_or_else(|| format!("No known A2A agent with name '{name}'. Use a2a_discover first or provide agent_url directly."))? + } else { + return Err("Missing 'agent_url' or 'agent_name' parameter".to_string()); + }; + + let agent_label = input["agent_name"] + .as_str() + .unwrap_or("remote-agent") + .to_string(); + let task_id = input["task_id"] + .as_str() + .map(String::from) + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + let session_id = input["session_id"].as_str().map(String::from); + + // Capture the caller's channel context for the final callback. + let channel_ctx = caller_agent_id.and_then(|id| kh.get_channel_context(id)); + if channel_ctx.is_none() { + warn!( + task_id = %task_id, + "a2a_send_async: no channel context — result will not be delivered to channel" + ); + } + + // Reject new tasks if the global cap is reached to prevent unbounded growth. + if ASYNC_TASKS.len() >= MAX_CONCURRENT_ASYNC_TASKS { + return Err(format!( + "Too many concurrent async tasks ({MAX_CONCURRENT_ASYNC_TASKS} active). \ + Cancel an existing task with a2a_cancel_task or wait for one to complete." + )); + } + + ensure_cleanup_task(); + + // Shared progress buffer updated as SSE chunks arrive. + let progress: std::sync::Arc> = + std::sync::Arc::new(std::sync::RwLock::new(String::new())); + A2A_TASK_PROGRESS.insert(task_id.clone(), progress.clone()); + + let tid = task_id.clone(); + let agent_label_cb = agent_label.clone(); + let handle = tokio::spawn(async move { + let _guard = TaskCleanupGuard(tid.clone()); + let client = crate::a2a::A2aClient::new(); + let result = client + .send_task_streaming_with_progress( + &url, + &message, + session_id.as_deref(), + None, // no per-request timeout for async tasks; TTL sweep handles abandonment + progress.clone(), + ) + .await; + + let result_text = match result { + Ok(task) => { + serde_json::to_string_pretty(&task).unwrap_or_else(|_| "Task completed".to_string()) + } + Err(e) => format!("Error: {e}"), + }; + + // Overwrite the progress buffer with the final result so a2a_check_task shows it. + if let Ok(mut buf) = progress.write() { + *buf = result_text.clone(); + } + + // Deliver the result to the originating channel. + if let Some(ctx) = channel_ctx { + let _ = kh + .inject_async_callback(ctx, &agent_label_cb, &result_text) + .await; + } + // _guard drops here, removing tid from ASYNC_TASKS and A2A_TASK_PROGRESS. + }); + + ASYNC_TASKS.insert( + task_id.clone(), + AsyncTaskEntry { + handle, + inserted_at: std::time::Instant::now(), + }, + ); + + Ok(format!( + "Task submitted to {agent_label} (task_id: {task_id}). \ + Results will be delivered to the channel when complete. \ + Use a2a_check_task(\"{task_id}\") to poll live progress." + )) +} + +/// Return the live accumulated output from a running async A2A task. +fn tool_a2a_check_task(input: &serde_json::Value) -> Result { + let task_id = input["task_id"] + .as_str() + .ok_or("Missing 'task_id' parameter")?; + + if let Some(entry) = A2A_TASK_PROGRESS.get(task_id) { + let text = entry + .value() + .read() + .map(|buf| buf.clone()) + .unwrap_or_default(); + if text.is_empty() { + Ok("Task is running — no output yet.".to_string()) + } else { + Ok(text) + } + } else { + Ok(format!("No active task with ID '{task_id}'.")) + } +} + +/// Abort a running async A2A task and clean up both maps. +fn tool_a2a_cancel_task(input: &serde_json::Value) -> Result { + let task_id = input["task_id"] + .as_str() + .ok_or("Missing 'task_id' parameter")?; + + let had_task = if let Some((_, entry)) = ASYNC_TASKS.remove(task_id) { + entry.handle.abort(); + true + } else { + false + }; + A2A_TASK_PROGRESS.remove(task_id); + + if had_task { + Ok(format!("Task {task_id} cancelled.")) + } else { + Ok(format!("No active task with ID '{task_id}'.")) + } +} + // --------------------------------------------------------------------------- // Image analysis tool // --------------------------------------------------------------------------- diff --git a/crates/openfang-types/src/lib.rs b/crates/openfang-types/src/lib.rs index fbfd88fa8d..d8ed4e5aef 100644 --- a/crates/openfang-types/src/lib.rs +++ b/crates/openfang-types/src/lib.rs @@ -22,6 +22,16 @@ pub mod tool; pub mod tool_compat; pub mod webhook; +/// Context for delivering async agent results back to the originating channel. +#[derive(Debug, Clone)] +pub struct ChannelCallbackContext { + pub channel_type: String, + pub reply_to_platform_id: String, + pub reply_to_display_name: String, + pub thread_id: Option, + pub agent_id: String, +} + /// Safely truncate a string to at most `max_bytes`, never splitting a UTF-8 char. pub fn truncate_str(s: &str, max_bytes: usize) -> &str { if s.len() <= max_bytes {