diff --git a/code-rs/core/src/agent_tool.rs b/code-rs/core/src/agent_tool.rs index 3d24a7ab623..530dc988191 100644 --- a/code-rs/core/src/agent_tool.rs +++ b/code-rs/core/src/agent_tool.rs @@ -1126,6 +1126,12 @@ impl AgentManager { .or_else(|| self.archived_terminal_agents.get(agent_id).cloned()) } + pub fn get_agent_for_session(&self, agent_id: &str, owner_session_id: Uuid) -> Option { + self + .get_agent(agent_id) + .filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id))) + } + pub fn get_all_agents(&self) -> impl Iterator { self.agents.values() } @@ -1176,6 +1182,20 @@ impl AgentManager { out } + pub fn list_agents_for_session( + &self, + status_filter: Option, + batch_id: Option, + recent_only: bool, + owner_session_id: Uuid, + ) -> Vec { + self + .list_agents(status_filter, batch_id, recent_only) + .into_iter() + .filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id))) + .collect() + } + pub fn has_active_agents(&self) -> bool { self.agents .values() @@ -1196,6 +1216,21 @@ impl AgentManager { } } + pub async fn cancel_agent_for_session( + &mut self, + agent_id: &str, + owner_session_id: Uuid, + ) -> bool { + if self + .get_agent(agent_id) + .is_some_and(|agent| agent_belongs_to_session(&agent, Some(owner_session_id))) + { + self.cancel_agent(agent_id).await + } else { + false + } + } + pub async fn cancel_batch(&mut self, batch_id: &str) -> usize { let agent_ids: Vec = self .agents @@ -1213,6 +1248,28 @@ impl AgentManager { count } + pub async fn cancel_batch_for_session( + &mut self, + batch_id: &str, + owner_session_id: Uuid, + ) -> usize { + let agent_ids: Vec = self + .agents + .values() + .filter(|agent| agent.batch_id.as_ref() == Some(&batch_id.to_string())) + .filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id))) + .map(|agent| agent.id.clone()) + .collect(); + + let mut count = 0; + for agent_id in agent_ids { + if self.cancel_agent(&agent_id).await { + count += 1; + } + } + count + } + pub async fn update_agent_status(&mut self, agent_id: &str, status: AgentStatus) { let mut terminal = false; if let Some(agent) = self.agents.get_mut(agent_id) { @@ -3026,6 +3083,42 @@ mod tests { } } + fn test_agent( + id: &str, + owner_session_id: Uuid, + batch_id: &str, + status: AgentStatus, + ) -> Agent { + let now = chrono::Utc::now(); + Agent { + id: id.to_string(), + owner_session_id: Some(owner_session_id), + batch_id: Some(batch_id.to_string()), + model: "code-gpt-5.5".to_string(), + name: Some(id.to_string()), + prompt: "prompt".to_string(), + context: None, + output_goal: None, + files: Vec::new(), + read_only: true, + status, + result: None, + error: None, + created_at: now, + started_at: Some(now), + completed_at: None, + progress: Vec::new(), + worktree_path: None, + branch_name: None, + worktree_base: None, + source_kind: None, + log_tag: None, + config: None, + reasoning_effort: ReasoningEffort::Low, + last_activity: now, + } + } + #[test] fn code_family_falls_back_when_command_missing() { let cfg = agent_with_command("definitely-not-present-429"); @@ -3134,6 +3227,52 @@ mod tests { assert!(!payload_b.agents.iter().any(|agent| agent.id == agent_a)); } + #[tokio::test] + async fn model_facing_agent_queries_are_session_scoped() { + let mut manager = AgentManager::new(); + let session_a = Uuid::new_v4(); + let session_b = Uuid::new_v4(); + let shared_batch = "shared-batch"; + + manager.agents.insert( + "agent-a".to_string(), + test_agent("agent-a", session_a, shared_batch, AgentStatus::Running), + ); + manager.agents.insert( + "agent-b".to_string(), + test_agent("agent-b", session_b, shared_batch, AgentStatus::Running), + ); + manager.handles.insert("agent-a".to_string(), tokio::spawn(async {})); + manager.handles.insert("agent-b".to_string(), tokio::spawn(async {})); + manager.archived_terminal_agents.insert( + "archived-b".to_string(), + test_agent("archived-b", session_b, shared_batch, AgentStatus::Completed), + ); + + let visible_to_a = manager.list_agents_for_session( + None, + Some(shared_batch.to_string()), + false, + session_a, + ); + assert_eq!(visible_to_a.len(), 1); + assert_eq!(visible_to_a[0].id, "agent-a"); + assert!(manager.get_agent_for_session("agent-b", session_a).is_none()); + assert!(manager.get_agent_for_session("archived-b", session_a).is_none()); + assert!(!manager.cancel_agent_for_session("agent-b", session_a).await); + + assert!(manager.agents.contains_key("agent-b")); + assert!(manager.cancel_batch_for_session(shared_batch, session_a).await == 1); + assert_eq!( + manager.agents.get("agent-a").map(|agent| &agent.status), + Some(&AgentStatus::Cancelled), + ); + assert_eq!( + manager.agents.get("agent-b").map(|agent| &agent.status), + Some(&AgentStatus::Running), + ); + } + #[tokio::test] async fn agent_status_updates_prune_closed_session_senders() { let mut manager = AgentManager::new(); diff --git a/code-rs/core/src/codex.rs b/code-rs/core/src/codex.rs index 0ff043e8b89..332c0230eae 100644 --- a/code-rs/core/src/codex.rs +++ b/code-rs/core/src/codex.rs @@ -1171,10 +1171,6 @@ pub struct Codex { rx_event: Receiver, } -// Allow internal components (like background exec completions) to trigger a new -// turn without fabricating a visible user message. We enqueue an empty -// UserInput; the model will only see queued developer/system items. -static TX_SUB_GLOBAL: OnceLock> = OnceLock::new(); static ANY_BG_NOTIFY: OnceLock> = OnceLock::new(); /// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`], @@ -1247,8 +1243,6 @@ impl Codex { tx_sub, rx_event, }; - // Make a clone of tx_sub available for internal auto-turn triggers. - let _ = TX_SUB_GLOBAL.set(codex.tx_sub.clone()); let _ = ANY_BG_NOTIFY.set(std::sync::Arc::new(Notify::new())); let init_id = codex.submit(configure_session).await?; diff --git a/code-rs/core/src/codex/streaming.rs b/code-rs/core/src/codex/streaming.rs index e314f2358f5..b8f75ebea75 100644 --- a/code-rs/core/src/codex/streaming.rs +++ b/code-rs/core/src/codex/streaming.rs @@ -277,7 +277,9 @@ pub(super) async fn submission_loop( if !seen_batches.insert(trimmed.to_string()) { continue; } - cancelled += manager.cancel_batch(trimmed).await; + cancelled += manager + .cancel_batch_for_session(trimmed, sess_arc.session_uuid()) + .await; } for agent_id in agent_ids { @@ -288,7 +290,10 @@ pub(super) async fn submission_loop( if !seen_agents.insert(trimmed.to_string()) { continue; } - if manager.cancel_agent(trimmed).await { + if manager + .cancel_agent_for_session(trimmed, sess_arc.session_uuid()) + .await + { cancelled += 1; } } @@ -8911,7 +8916,7 @@ async fn handle_check_agent_status( Ok(params) => { let manager = AGENT_MANAGER.read().await; - if let Some(agent) = manager.get_agent(¶ms.agent_id) { + if let Some(agent) = manager.get_agent_for_session(¶ms.agent_id, sess.session_uuid()) { match agent.batch_id.as_deref() { Some(batch) if batch == params.batch_id => {} _ => { @@ -8958,7 +8963,7 @@ async fn handle_check_agent_status( }; // Re-acquire manager to get fresh progress after potential delay let manager = AGENT_MANAGER.read().await; - if let Some(agent) = manager.get_agent(¶ms.agent_id) { + if let Some(agent) = manager.get_agent_for_session(¶ms.agent_id, sess.session_uuid()) { let joined = agent.progress.join("\n"); match write_agent_file(&dir, "progress.log", &joined) { Ok(p) => progress_file = Some(p.display().to_string()), @@ -9037,7 +9042,7 @@ async fn handle_get_agent_result( Ok(params) => { let manager = AGENT_MANAGER.read().await; - if let Some(agent) = manager.get_agent(¶ms.agent_id) { + if let Some(agent) = manager.get_agent_for_session(¶ms.agent_id, sess.session_uuid()) { match agent.batch_id.as_deref() { Some(batch) if batch == params.batch_id => {} _ => { @@ -9171,7 +9176,7 @@ async fn handle_cancel_agent( }; } }; - if let Some(agent) = manager.get_agent(&agent_id) { + if let Some(agent) = manager.get_agent_for_session(&agent_id, sess.session_uuid()) { if agent.batch_id.as_deref() != Some(batch_id.as_str()) { return ResponseInputItem::FunctionCallOutput { call_id: call_id_clone, @@ -9184,7 +9189,7 @@ async fn handle_cancel_agent( }; } } - if manager.cancel_agent(&agent_id).await { + if manager.cancel_agent_for_session(&agent_id, sess.session_uuid()).await { ResponseInputItem::FunctionCallOutput { call_id: call_id_clone, output: FunctionCallOutputPayload { @@ -9200,7 +9205,7 @@ async fn handle_cancel_agent( } } } else if let Some(batch_id) = params.batch_id { - let count = manager.cancel_batch(&batch_id).await; + let count = manager.cancel_batch_for_session(&batch_id, sess.session_uuid()).await; ResponseInputItem::FunctionCallOutput { call_id: call_id_clone, output: FunctionCallOutputPayload { @@ -9273,7 +9278,7 @@ async fn handle_wait_for_agent( let manager = AGENT_MANAGER.read().await; if let Some(agent_id) = ¶ms.agent_id { - if let Some(agent) = manager.get_agent(agent_id) { + if let Some(agent) = manager.get_agent_for_session(agent_id, sess.session_uuid()) { match agent.batch_id.as_deref() { Some(batch) if batch == batch_id => {} _ => { @@ -9362,7 +9367,7 @@ async fn handle_wait_for_agent( } } } else { - let agents = manager.list_agents(None, Some(batch_id.clone()), false); + let agents = manager.list_agents_for_session(None, Some(batch_id.clone()), false, sess.session_uuid()); // Separate terminal vs non-terminal agents let completed_agents: Vec<_> = agents @@ -9678,10 +9683,11 @@ async fn handle_list_agents( _ => None, }); - let agents = manager.list_agents( + let agents = manager.list_agents_for_session( status_filter, Some(batch_id.clone()), params.recent_only.unwrap_or(false), + sess.session_uuid(), ); // Count running agents for status update @@ -10719,6 +10725,7 @@ async fn handle_container_exec_with_params( let suppress_event_flag_task = suppress_event_flag.clone(); let display_label_task = display_label.clone(); let tool_output_max_bytes = sess.tool_output_max_bytes; + let sess_for_background_completion = sess.self_handle.upgrade(); let task_handle = tokio::spawn(async move { // Build stdout stream with tail capture. We cannot stamp via `Session` here, // but deltas will be delivered with neutral ordering which the UI tolerates. @@ -10809,10 +10816,10 @@ async fn handle_container_exec_with_params( format!("{label} completed in background") }; let bg_event = EventMsg::BackgroundEvent(BackgroundEventEvent { message }); - let ev = Event { id: sub_id_for_events.clone(), event_seq: 0, msg: bg_event, order: None }; - let _ = tx_event.send(ev).await; + if let Some(sess_arc) = sess_for_background_completion.as_ref() { + let ev = sess_arc.make_event(&sub_id_for_events, bg_event); + sess_arc.send_event(ev).await; - if let Some(tx) = TX_SUB_GLOBAL.get() { let header_label = if label.is_empty() { format!("call_id={}", call_id_for_events) } else { @@ -10828,9 +10835,30 @@ async fn handle_container_exec_with_params( tool_output_max_bytes, ); let dev_text = format!("{}\n\n{}", header, body); - let _ = tx - .send(Submission { id: uuid::Uuid::new_v4().to_string(), op: Op::AddPendingInputDeveloper { text: dev_text } }) - .await; + let dev_msg = ResponseInputItem::Message { + role: "developer".to_string(), + content: vec![ContentItem::InputText { text: dev_text }], + }; + if sess_arc.enqueue_out_of_turn_item(dev_msg) { + sess_arc.cleanup_old_status_items().await; + let turn_context = sess_arc.make_turn_context(); + let sub_id = sess_arc.next_internal_sub_id(); + let sentinel_input = vec![InputItem::Text { + text: PENDING_ONLY_SENTINEL.to_string(), + }]; + let agent = AgentTask::spawn( + Arc::clone(sess_arc), + turn_context, + sub_id, + sentinel_input, + TaskOriginKind::OutOfTurnDeveloper, + false, + ); + sess_arc.set_task(agent); + } + } else { + let ev = Event { id: sub_id_for_events.clone(), event_seq: 0, msg: bg_event, order: None }; + let _ = tx_event.send(ev).await; } } if let Some(n) = ANY_BG_NOTIFY.get() { n.notify_waiters(); }