diff --git a/code-rs/core/src/agent_tool.rs b/code-rs/core/src/agent_tool.rs index 347203bf7fd..b427e497779 100644 --- a/code-rs/core/src/agent_tool.rs +++ b/code-rs/core/src/agent_tool.rs @@ -360,6 +360,8 @@ pub enum AgentStatus { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Agent { pub id: String, + #[serde(default)] + pub owner_session_id: Option, pub batch_id: Option, pub model: String, #[serde(default)] @@ -403,7 +405,7 @@ pub struct AgentManager { // (including worktree/branch metadata) for the full Auto Drive run. archived_terminal_agents: HashMap, handles: HashMap>, - event_sender: Option>, + event_senders: Vec, debug_log_root: Option, watchdog_handle: Option>, inactivity_timeout: Duration, @@ -458,13 +460,66 @@ pub struct AgentStatusUpdatePayload { pub task: Option, } +struct AgentStatusSender { + owner_session_id: Uuid, + sender: mpsc::UnboundedSender, +} + +fn agent_belongs_to_session(agent: &Agent, owner_session_id: Option) -> bool { + match owner_session_id { + Some(owner_session_id) => agent.owner_session_id == Some(owner_session_id), + None => true, + } +} + +fn agent_info_for_status(agent: &Agent, now: DateTime) -> AgentInfo { + // Just show the model name - status provides the useful info. + let name = agent + .name + .as_ref() + .map(|value| value.clone()) + .unwrap_or_else(|| agent.model.clone()); + let start = agent.started_at.unwrap_or(agent.created_at); + let end = agent.completed_at.unwrap_or(now); + let elapsed_ms = match end.signed_duration_since(start).num_milliseconds() { + value if value >= 0 => Some(value as u64), + _ => None, + }; + + AgentInfo { + id: agent.id.clone(), + name, + status: format!("{:?}", agent.status).to_lowercase(), + batch_id: agent.batch_id.clone(), + model: Some(agent.model.clone()), + last_progress: agent.progress.last().cloned(), + result: agent.result.clone(), + error: agent.error.clone(), + elapsed_ms, + token_count: None, + last_activity_at: match agent.status { + AgentStatus::Pending | AgentStatus::Running => Some(agent.last_activity.to_rfc3339()), + _ => None, + }, + seconds_since_last_activity: match agent.status { + AgentStatus::Pending | AgentStatus::Running => Some( + now.signed_duration_since(agent.last_activity) + .num_seconds() + .max(0) as u64, + ), + _ => None, + }, + source_kind: agent.source_kind.clone(), + } +} + impl AgentManager { pub fn new() -> Self { Self { agents: HashMap::new(), archived_terminal_agents: HashMap::new(), handles: HashMap::new(), - event_sender: None, + event_senders: Vec::new(), debug_log_root: None, watchdog_handle: None, inactivity_timeout: Duration::minutes(30), @@ -472,12 +527,25 @@ impl AgentManager { } } - pub fn set_event_sender(&mut self, sender: mpsc::UnboundedSender) { - self.event_sender = Some(sender); - // New session lifecycle: keep only live agents and reset per-session - // diagnostics/archives so long-lived UIs start from a clean slate. - self.archived_terminal_agents.clear(); - self.diagnostics = AgentManagerDiagnostics::default(); + pub fn set_event_sender( + &mut self, + owner_session_id: Uuid, + sender: mpsc::UnboundedSender, + ) { + self.event_senders + .retain(|registered| !registered.sender.is_closed()); + let first_session = self.event_senders.is_empty(); + self.event_senders.push(AgentStatusSender { + owner_session_id, + sender, + }); + // New manager lifecycle: keep only live agents and reset diagnostics + // when the first UI connects, without clearing another live session's + // archived terminal agents when a second UI registers. + if first_session { + self.archived_terminal_agents.clear(); + self.diagnostics = AgentManagerDiagnostics::default(); + } self.start_watchdog(); } @@ -711,10 +779,11 @@ impl AgentManager { self.prune_terminal_agents(); } - fn visible_agents_for_status(&self) -> Vec<&Agent> { + fn visible_agents_for_status(&self, owner_session_id: Option) -> Vec<&Agent> { let mut active: Vec<&Agent> = self .agents .values() + .filter(|agent| agent_belongs_to_session(agent, owner_session_id)) .filter(|agent| matches!(agent.status, AgentStatus::Pending | AgentStatus::Running)) .collect(); @@ -723,6 +792,7 @@ impl AgentManager { let mut terminal: Vec<&Agent> = self .agents .values() + .filter(|agent| agent_belongs_to_session(agent, owner_session_id)) .filter(|agent| { matches!( agent.status, @@ -743,12 +813,95 @@ impl AgentManager { } pub fn status_visible_agents(&self) -> Vec { - self.visible_agents_for_status() + self.visible_agents_for_status(None) .into_iter() .cloned() .collect() } + pub fn status_visible_agents_for_session(&self, owner_session_id: Uuid) -> Vec { + self.visible_agents_for_status(Some(owner_session_id)) + .into_iter() + .cloned() + .collect() + } + + fn status_payload_for_session(&mut self, owner_session_id: Uuid) -> AgentStatusUpdatePayload { + let now = Utc::now(); + + let total_terminal = self + .agents + .values() + .filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id))) + .filter(|agent| { + matches!( + agent.status, + AgentStatus::Completed | AgentStatus::Failed | AgentStatus::Cancelled + ) + }) + .count(); + let omitted_terminal = total_terminal.saturating_sub(MAX_STATUS_TERMINAL_AGENTS); + if omitted_terminal > 0 { + self.diagnostics.status_terminal_agents_omitted = self + .diagnostics + .status_terminal_agents_omitted + .saturating_add(omitted_terminal as u64); + debug!( + omitted_terminal, + total_terminal, + owner_session_id = %owner_session_id, + running_agents = self + .agents + .values() + .filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id))) + .filter(|agent| { + matches!(agent.status, AgentStatus::Pending | AgentStatus::Running) + }) + .count(), + cumulative_omitted = self.diagnostics.status_terminal_agents_omitted, + "omitting terminal agents from status payload to keep UI responsive" + ); + } + + let agents: Vec = self + .visible_agents_for_status(Some(owner_session_id)) + .into_iter() + .map(|agent| agent_info_for_status(agent, now)) + .collect(); + + // Prefer active agents for shared context/task; terminal agents may + // have had heavy fields compacted already. + let source_agent = self + .agents + .values() + .filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id))) + .find(|agent| matches!(agent.status, AgentStatus::Pending | AgentStatus::Running)) + .or_else(|| { + self.agents + .values() + .find(|agent| agent_belongs_to_session(agent, Some(owner_session_id))) + }); + let (context, task) = source_agent + .map(|agent| { + let context = agent.context.as_ref().and_then(|value| { + if value.trim().is_empty() { + None + } else { + Some(value.clone()) + } + }); + let task = if agent.prompt.trim().is_empty() { + None + } else { + Some(agent.prompt.clone()) + }; + (context, task) + }) + .unwrap_or((None, None)); + + AgentStatusUpdatePayload { agents, context, task } + } + fn append_agent_log(&self, log_tag: &str, line: &str) { let Some(root) = &self.debug_log_root else { return; }; let dir = root.join(log_tag); @@ -769,115 +922,26 @@ impl AgentManager { } async fn send_agent_status_update(&mut self) { - if let Some(ref sender) = self.event_sender { - let now = Utc::now(); - - let total_terminal = self - .agents - .values() - .filter(|agent| { - matches!( - agent.status, - AgentStatus::Completed | AgentStatus::Failed | AgentStatus::Cancelled - ) - }) - .count(); - let omitted_terminal = total_terminal.saturating_sub(MAX_STATUS_TERMINAL_AGENTS); - if omitted_terminal > 0 { - self.diagnostics.status_terminal_agents_omitted = self - .diagnostics - .status_terminal_agents_omitted - .saturating_add(omitted_terminal as u64); - debug!( - omitted_terminal, - total_terminal, - running_agents = self - .agents - .values() - .filter(|agent| { - matches!(agent.status, AgentStatus::Pending | AgentStatus::Running) - }) - .count(), - cumulative_omitted = self.diagnostics.status_terminal_agents_omitted, - "omitting terminal agents from status payload to keep UI responsive" - ); - } - - let agents: Vec = self - .visible_agents_for_status() + if !self.event_senders.is_empty() { + let sessions: Vec<(usize, Uuid)> = self + .event_senders + .iter() + .enumerate() + .filter(|(_, registered)| !registered.sender.is_closed()) + .map(|(idx, registered)| (idx, registered.owner_session_id)) + .collect(); + let sender_payloads: Vec<(usize, AgentStatusUpdatePayload)> = sessions .into_iter() - .map(|agent| { - // Just show the model name - status provides the useful info - let name = agent - .name - .as_ref() - .map(|value| value.clone()) - .unwrap_or_else(|| agent.model.clone()); - let start = agent.started_at.unwrap_or(agent.created_at); - let end = agent.completed_at.unwrap_or(now); - let elapsed_ms = match end.signed_duration_since(start).num_milliseconds() { - value if value >= 0 => Some(value as u64), - _ => None, - }; - - AgentInfo { - id: agent.id.clone(), - name, - status: format!("{:?}", agent.status).to_lowercase(), - batch_id: agent.batch_id.clone(), - model: Some(agent.model.clone()), - last_progress: agent.progress.last().cloned(), - result: agent.result.clone(), - error: agent.error.clone(), - elapsed_ms, - token_count: None, - last_activity_at: match agent.status { - AgentStatus::Pending | AgentStatus::Running => { - Some(agent.last_activity.to_rfc3339()) - } - _ => None, - }, - seconds_since_last_activity: match agent.status { - AgentStatus::Pending | AgentStatus::Running => Some( - Utc::now() - .signed_duration_since(agent.last_activity) - .num_seconds() - .max(0) as u64, - ), - _ => None, - }, - source_kind: agent.source_kind.clone(), - } - }) + .map(|(idx, owner_session_id)| (idx, self.status_payload_for_session(owner_session_id))) .collect(); - // Prefer active agents for shared context/task; terminal agents may - // have had heavy fields compacted already. - let source_agent = self - .agents - .values() - .find(|agent| matches!(agent.status, AgentStatus::Pending | AgentStatus::Running)) - .or_else(|| self.agents.values().next()); - let (context, task) = source_agent - .map(|agent| { - let context = agent - .context - .as_ref() - .and_then(|value| if value.trim().is_empty() { - None - } else { - Some(value.clone()) - }); - let task = if agent.prompt.trim().is_empty() { - None - } else { - Some(agent.prompt.clone()) - }; - (context, task) - }) - .unwrap_or((None, None)); - let payload = AgentStatusUpdatePayload { agents, context, task }; - let _ = sender.send(payload); + for (idx, payload) in sender_payloads.into_iter().rev() { + if self.event_senders[idx].sender.send(payload).is_err() { + self.event_senders.remove(idx); + } + } + self.event_senders + .retain(|registered| !registered.sender.is_closed()); } } @@ -891,6 +955,7 @@ impl AgentManager { files: Vec, read_only: bool, batch_id: Option, + owner_session_id: Uuid, reasoning_effort: code_protocol::config_types::ReasoningEffort, ) -> String { self.create_agent_internal( @@ -903,6 +968,7 @@ impl AgentManager { read_only, batch_id, None, + owner_session_id, None, None, None, @@ -922,6 +988,7 @@ impl AgentManager { read_only: bool, batch_id: Option, config: AgentConfig, + owner_session_id: Uuid, reasoning_effort: code_protocol::config_types::ReasoningEffort, ) -> String { self.create_agent_internal( @@ -934,6 +1001,7 @@ impl AgentManager { read_only, batch_id, Some(config), + owner_session_id, None, None, None, @@ -954,6 +1022,7 @@ impl AgentManager { read_only: bool, batch_id: Option, config: Option, + owner_session_id: Uuid, worktree_branch: Option, worktree_base: Option, source_kind: Option, @@ -970,6 +1039,7 @@ impl AgentManager { read_only, batch_id, config, + owner_session_id, worktree_branch, worktree_base, source_kind, @@ -989,6 +1059,7 @@ impl AgentManager { read_only: bool, batch_id: Option, config: Option, + owner_session_id: Uuid, worktree_branch: Option, worktree_base: Option, source_kind: Option, @@ -1005,6 +1076,7 @@ impl AgentManager { let agent = Agent { id: agent_id.clone(), + owner_session_id: Some(owner_session_id), batch_id, model, name: normalize_agent_name(name), @@ -2854,6 +2926,7 @@ mod tests { use std::path::Path; use std::path::PathBuf; use std::sync::{Mutex, OnceLock}; + use uuid::Uuid; #[cfg(unix)] static STDIN_LOCK: Mutex<()> = Mutex::new(()); @@ -2984,6 +3057,144 @@ mod tests { assert_eq!(custom, std::path::PathBuf::from("custom-coder")); } + #[tokio::test] + async fn agent_status_updates_are_broadcast_to_all_sessions() { + let mut manager = AgentManager::new(); + let session_a = Uuid::new_v4(); + let session_b = Uuid::new_v4(); + let (tx_a, mut rx_a) = tokio::sync::mpsc::unbounded_channel(); + let (tx_b, mut rx_b) = tokio::sync::mpsc::unbounded_channel(); + + manager.set_event_sender(session_a, tx_a); + manager.set_event_sender(session_b, tx_b); + manager.send_agent_status_update().await; + + assert!( + rx_a.try_recv().is_ok(), + "first session should still receive updates" + ); + assert!(rx_b.try_recv().is_ok(), "second session should receive updates"); + } + + #[tokio::test] + async fn agent_status_updates_include_only_owned_agents() { + let mut manager = AgentManager::new(); + let session_a = Uuid::new_v4(); + let session_b = Uuid::new_v4(); + let (tx_a, mut rx_a) = tokio::sync::mpsc::unbounded_channel(); + let (tx_b, mut rx_b) = tokio::sync::mpsc::unbounded_channel(); + + manager.set_event_sender(session_a, tx_a); + manager.set_event_sender(session_b, tx_b); + let agent_a = manager + .create_agent( + "code-gpt-5.5".to_string(), + Some("session-a".to_string()), + "task a".to_string(), + None, + None, + Vec::new(), + true, + Some("batch-a".to_string()), + session_a, + ReasoningEffort::Low, + ) + .await; + let agent_b = manager + .create_agent( + "code-gpt-5.5".to_string(), + Some("session-b".to_string()), + "task b".to_string(), + None, + None, + Vec::new(), + true, + Some("batch-b".to_string()), + session_b, + ReasoningEffort::Low, + ) + .await; + + let mut payload_a = rx_a + .try_recv() + .expect("session A should receive an update"); + while let Ok(next) = rx_a.try_recv() { + payload_a = next; + } + let mut payload_b = rx_b + .try_recv() + .expect("session B should receive an update"); + while let Ok(next) = rx_b.try_recv() { + payload_b = next; + } + + assert!(payload_a.agents.iter().any(|agent| agent.id == agent_a)); + assert!(!payload_a.agents.iter().any(|agent| agent.id == agent_b)); + assert!(payload_b.agents.iter().any(|agent| agent.id == agent_b)); + assert!(!payload_b.agents.iter().any(|agent| agent.id == agent_a)); + } + + #[tokio::test] + async fn agent_status_updates_prune_closed_session_senders() { + let mut manager = AgentManager::new(); + let session_closed = Uuid::new_v4(); + let session_open = Uuid::new_v4(); + let (tx_closed, rx_closed) = tokio::sync::mpsc::unbounded_channel(); + let (tx_open, mut rx_open) = tokio::sync::mpsc::unbounded_channel(); + drop(rx_closed); + + manager.set_event_sender(session_closed, tx_closed); + manager.set_event_sender(session_open, tx_open); + manager.send_agent_status_update().await; + + assert!(rx_open.try_recv().is_ok(), "open session should receive updates"); + assert_eq!(manager.event_senders.len(), 1); + } + + #[test] + fn registering_second_session_keeps_existing_archived_agents() { + let mut manager = AgentManager::new(); + let now = chrono::Utc::now(); + let archived_id = "archived-agent".to_string(); + manager.archived_terminal_agents.insert( + archived_id.clone(), + Agent { + id: archived_id.clone(), + owner_session_id: Some(Uuid::new_v4()), + batch_id: Some("batch-archived".to_string()), + model: "code-gpt-5.5".to_string(), + name: Some("Archived".to_string()), + prompt: String::new(), + context: None, + output_goal: None, + files: Vec::new(), + read_only: true, + status: AgentStatus::Completed, + result: Some("ok".to_string()), + error: None, + created_at: now, + started_at: Some(now), + completed_at: Some(now), + progress: Vec::new(), + worktree_path: None, + branch_name: None, + worktree_base: None, + source_kind: None, + log_tag: None, + config: None, + reasoning_effort: ReasoningEffort::Low, + last_activity: now, + }, + ); + + let (tx_a, _rx_a) = tokio::sync::mpsc::unbounded_channel(); + manager.set_event_sender(Uuid::new_v4(), tx_a); + let (tx_b, _rx_b) = tokio::sync::mpsc::unbounded_channel(); + manager.set_event_sender(Uuid::new_v4(), tx_b); + + assert!(manager.archived_terminal_agents.contains_key(&archived_id)); + } + #[tokio::test] async fn read_only_agents_use_code_binary_path() { let _lock = env_lock().lock().expect("env lock"); @@ -3389,6 +3600,7 @@ exit 0 id.clone(), Agent { id, + owner_session_id: None, batch_id: Some("batch-1".to_string()), model: "code-gpt-5.5".to_string(), name: Some("Prune Test".to_string()), @@ -3438,6 +3650,7 @@ exit 0 agent_id.clone(), Agent { id: agent_id.clone(), + owner_session_id: None, batch_id: Some("batch-compact".to_string()), model: "code-gpt-5.5".to_string(), name: Some("Finalize".to_string()), @@ -3504,6 +3717,7 @@ exit 0 id.clone(), Agent { id, + owner_session_id: None, batch_id: Some(batch_id.clone()), model: "code-gpt-5.5".to_string(), name: Some("Archive Test".to_string()), diff --git a/code-rs/core/src/codex/streaming.rs b/code-rs/core/src/codex/streaming.rs index 14fca196b65..e314f2358f5 100644 --- a/code-rs/core/src/codex/streaming.rs +++ b/code-rs/core/src/codex/streaming.rs @@ -951,10 +951,10 @@ pub(super) async fn submission_loop( let mut manager = AGENT_MANAGER.write().await; let (agent_tx, mut agent_rx) = tokio::sync::mpsc::unbounded_channel::(); - manager.set_event_sender(agent_tx); + let sess_for_agents = sess.as_ref().expect("session active").clone(); + manager.set_event_sender(sess_for_agents.session_uuid(), agent_tx); drop(manager); - let sess_for_agents = sess.as_ref().expect("session active").clone(); // Forward agent events to the main event channel let tx_event_clone = tx_event.clone(); tokio::spawn(async move { @@ -6970,6 +6970,8 @@ async fn handle_gh_run_wait( #[serde(default)] branch: Option, #[serde(default)] + head_sha: Option, + #[serde(default)] interval_seconds: Option, } @@ -7261,6 +7263,7 @@ async fn handle_gh_run_wait( fn run_summary_text( run_id: &str, branch: &str, + head_sha: Option<&str>, status: &str, conclusion: &str, workflow: Option, @@ -7288,6 +7291,12 @@ async fn handle_gh_run_wait( } lines.push(format!("Run: {run_id}")); lines.push(format!("Branch: {branch}")); + if let Some(head_sha) = head_sha { + if !head_sha.is_empty() { + let short_sha: String = head_sha.chars().take(12).collect(); + lines.push(format!("Commit: {short_sha}")); + } + } if let Some(url) = url { if !url.is_empty() { lines.push(format!("URL: {url}")); @@ -7383,6 +7392,13 @@ async fn handle_gh_run_wait( Some(value) => value.to_string(), None => detect_branch(&cwd).await, }; + let requested_head_sha = parsed + .head_sha + .as_ref() + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()); + let run_list_limit = if requested_head_sha.is_some() { "20" } else { "1" }; let mut resolved_run_id = match parsed.run_id { Some(Value::String(value)) if !value.trim().is_empty() => Some(value), @@ -7407,9 +7423,9 @@ async fn handle_gh_run_wait( "--branch", &branch, "--limit", - "1", + run_list_limit, "--json", - "databaseId,displayTitle,workflowName,headBranch,status,conclusion", + "databaseId,displayTitle,workflowName,headBranch,headSha,status,conclusion", ], repo.as_deref(), ) @@ -7429,9 +7445,9 @@ async fn handle_gh_run_wait( "--branch", &branch, "--limit", - "1", + run_list_limit, "--json", - "databaseId,displayTitle,workflowName,headBranch,status,conclusion", + "databaseId,displayTitle,workflowName,headBranch,headSha,status,conclusion", ], repo.as_deref(), ) @@ -7447,7 +7463,7 @@ async fn handle_gh_run_wait( if resolution_error.is_none() { let runs: Vec = serde_json::from_str(&json).unwrap_or_default(); - let run = runs.into_iter().next(); + let run = select_github_run_for_wait(runs, requested_head_sha.as_deref()); resolved_run_id = run .as_ref() .and_then(|item| item.get("databaseId").cloned()) @@ -7473,7 +7489,11 @@ async fn handle_gh_run_wait( } else { format!("branch {branch}") }; - resolution_error = Some(format!("No runs found for {detail}")); + let sha_detail = requested_head_sha + .as_deref() + .map(|sha| format!(" at commit {sha}")) + .unwrap_or_default(); + resolution_error = Some(format!("No runs found for {detail}{sha_detail}")); } } @@ -7521,6 +7541,9 @@ async fn handle_gh_run_wait( } prepared_branch = Some(branch.clone()); resolved_params.insert("branch".to_string(), Value::String(branch.clone())); + if let Some(head_sha) = requested_head_sha.clone() { + resolved_params.insert("head_sha".to_string(), Value::String(head_sha)); + } if let Some(workflow) = resolved_workflow.clone() { prepared_workflow = Some(workflow.clone()); resolved_params.insert("workflow".to_string(), Value::String(workflow)); @@ -7629,6 +7652,7 @@ async fn handle_gh_run_wait( let summary = run_summary_text( &run_id, prepared_branch.as_deref().unwrap_or(""), + requested_head_sha.as_deref(), &status, &conclusion, workflow_name, @@ -8634,6 +8658,7 @@ pub(crate) async fn handle_run_agent( read_only, Some(batch_id.clone()), config.clone(), + sess.session_uuid(), sess.model_reasoning_effort.into(), ) .await; @@ -8658,6 +8683,7 @@ pub(crate) async fn handle_run_agent( params.files.clone().unwrap_or_default(), read_only, Some(batch_id.clone()), + sess.session_uuid(), sess.model_reasoning_effort.into(), ) .await; @@ -8726,6 +8752,7 @@ pub(crate) async fn handle_run_agent( params.files.clone().unwrap_or_default(), read_only, Some(batch_id.clone()), + sess.session_uuid(), sess.model_reasoning_effort.into(), ) .await; @@ -11420,6 +11447,23 @@ fn track_seen_completed_agent_for_batch(state: &mut State, batch_id: &str, agent ensure_wait_batch_tracking_capacity(state, batch_id); } +fn select_github_run_for_wait( + runs: Vec, + head_sha: Option<&str>, +) -> Option { + let expected_sha = head_sha.map(str::trim).filter(|value| !value.is_empty()); + if let Some(expected_sha) = expected_sha { + runs.into_iter().find(|item| { + item + .get("headSha") + .and_then(|value| value.as_str()) + .is_some_and(|head_sha| head_sha.eq_ignore_ascii_case(expected_sha)) + }) + } else { + runs.into_iter().next() + } +} + fn agent_completion_wake_messages( payload: &AgentStatusUpdatePayload, state: &mut State, @@ -11510,6 +11554,7 @@ async fn enqueue_agent_completion_wake( #[cfg(test)] mod agent_completion_wake_tests { use super::agent_completion_wake_messages; + use super::select_github_run_for_wait; use super::track_seen_completed_agent_for_batch; use super::State; use super::AgentSourceKind; @@ -11520,6 +11565,7 @@ mod agent_completion_wake_tests { }; use crate::agent_tool::AgentStatusUpdatePayload; use crate::protocol::AgentInfo; + use serde_json::json; fn agent_info( id: &str, @@ -11639,6 +11685,41 @@ mod agent_completion_wake_tests { .expect("hot batch should be tracked"); assert!(seen.len() <= MAX_WAIT_TRACKED_AGENT_IDS_PER_BATCH); } + + #[test] + fn github_run_wait_selects_matching_head_sha() { + let runs = vec![ + json!({ + "databaseId": 2002, + "headSha": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + }), + json!({ + "databaseId": 1001, + "headSha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + }), + ]; + + let selected = select_github_run_for_wait( + runs, + Some("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"), + ) + .expect("matching run should be selected"); + + assert_eq!(selected["databaseId"], 1001); + } + + #[test] + fn github_run_wait_without_head_sha_uses_latest_run() { + let runs = vec![ + json!({ "databaseId": 2002, "headSha": "bbbb" }), + json!({ "databaseId": 1001, "headSha": "aaaa" }), + ]; + + let selected = select_github_run_for_wait(runs, None) + .expect("latest run should be selected without a SHA constraint"); + + assert_eq!(selected["databaseId"], 2002); + } } /// Send agent status update event to the TUI @@ -11649,7 +11730,7 @@ async fn send_agent_status_update(sess: &Session) { // stays responsive in long-running sessions. let now = Utc::now(); let agents: Vec = manager - .status_visible_agents() + .status_visible_agents_for_session(sess.session_uuid()) .into_iter() .map(|agent| { let status = agent.status.clone(); diff --git a/code-rs/core/src/openai_tools.rs b/code-rs/core/src/openai_tools.rs index dd2f0a7d512..1e1dfb0fd0c 100644 --- a/code-rs/core/src/openai_tools.rs +++ b/code-rs/core/src/openai_tools.rs @@ -1426,6 +1426,16 @@ pub fn create_gh_run_wait_tool() -> OpenAiTool { allowed_values: None, }, ); + properties.insert( + "head_sha".to_string(), + JsonSchema::String { + description: Some( + "Commit SHA to match when selecting a run by workflow/branch. Prefer passing this after a merge or push so another session's newer run on the same branch is not selected." + .to_string(), + ), + allowed_values: None, + }, + ); properties.insert( "interval_seconds".to_string(), JsonSchema::Number { @@ -1434,7 +1444,7 @@ pub fn create_gh_run_wait_tool() -> OpenAiTool { ); OpenAiTool::Function(ResponsesApiTool { name: "gh_run_wait".to_string(), - description: "Wait for a GitHub Actions run to finish, using gh run view polling. If run_id is omitted, selects the latest run for the workflow/branch; if both are omitted, selects the latest run on the current branch." + description: "Wait for a GitHub Actions run to finish, using gh run view polling. If run_id is omitted, selects the latest run for the workflow/branch, optionally constrained by head_sha; if both are omitted, selects the latest run on the current branch." .to_string(), strict: false, parameters: JsonSchema::Object { diff --git a/code-rs/core/tests/agent_completion_wake.rs b/code-rs/core/tests/agent_completion_wake.rs index 133c094f37b..9ed4ed16c48 100644 --- a/code-rs/core/tests/agent_completion_wake.rs +++ b/code-rs/core/tests/agent_completion_wake.rs @@ -5,45 +5,33 @@ mod common; use common::load_default_config_for_test; use code_core::config_types::AgentConfig; -use code_core::protocol::{AskForApproval, EventMsg, Op, SandboxPolicy}; +use code_core::protocol::{AgentInfo, AskForApproval, EventMsg, Op, SandboxPolicy}; use code_core::{built_in_model_providers, CodexAuth, ConversationManager}; use code_core::AGENT_MANAGER; use code_protocol::config_types::ReasoningEffort; use serde_json::json; use serial_test::serial; +use std::sync::Arc; use tempfile::TempDir; use tokio::time::{timeout, Duration, Instant}; use uuid::Uuid; use wiremock::matchers::{method, path_regex}; use wiremock::{Mock, MockServer, ResponseTemplate}; -fn sse_response(body: String) -> ResponseTemplate { - ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_string(body) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -#[serial] -async fn wake_on_agent_batch_completion_starts_new_turn() { - let code_home = TempDir::new().unwrap(); - let project_dir = TempDir::new().unwrap(); - - let server = MockServer::start().await; - +fn completed_response_body(response_id: &str, text: &str) -> String { let message_item = json!({ "type": "response.output_item.done", "item": { "type": "message", - "id": "msg-1", + "id": format!("msg-{response_id}"), "role": "assistant", - "content": [{"type": "output_text", "text": "ok"}], + "content": [{"type": "output_text", "text": text}], } }); let completed = json!({ "type": "response.completed", "response": { - "id": "resp-1", + "id": response_id, "usage": { "input_tokens": 0, "input_tokens_details": null, @@ -53,18 +41,18 @@ async fn wake_on_agent_batch_completion_starts_new_turn() { } } }); - let body = format!( + format!( "event: response.output_item.done\ndata: {message_item}\n\n\ event: response.completed\ndata: {completed}\n\n", - ); - Mock::given(method("POST")) - .and(path_regex(".*/responses$")) - .respond_with(sse_response(body)) - .up_to_n_times(1) - .mount(&server) - .await; + ) +} - let mut config = load_default_config_for_test(&code_home); +async fn spawn_test_conversation( + code_home: &TempDir, + project_dir: &TempDir, + server: &MockServer, +) -> (Arc, Uuid) { + let mut config = load_default_config_for_test(code_home); config.cwd = project_dir.path().to_path_buf(); config.approval_policy = AskForApproval::Never; config.sandbox_policy = SandboxPolicy::DangerFullAccess; @@ -75,12 +63,16 @@ event: response.completed\ndata: {completed}\n\n", config.model_provider = provider; let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")); - let codex = conversation_manager + let new_conversation = conversation_manager .new_conversation(config) .await - .expect("create conversation") - .conversation; + .expect("create conversation"); + let session_id = new_conversation.session_configured.session_id; + (new_conversation.conversation, session_id) +} + +async fn initialize_agent_status_channel(codex: &code_core::CodexConversation) { codex .submit(Op::CancelAgents { batch_ids: Vec::new(), @@ -105,6 +97,79 @@ event: response.completed\ndata: {completed}\n\n", panic!("did not observe readiness event before deadline"); } } +} + +async fn latest_agent_status_for( + codex: &code_core::CodexConversation, + agent_id: &str, +) -> AgentInfo { + let deadline = Instant::now() + Duration::from_secs(6); + let mut latest: Option = None; + while Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(Instant::now()); + let event = timeout(remaining, codex.next_event()) + .await + .expect("timeout waiting for agent status") + .expect("event stream ended unexpectedly"); + + if let EventMsg::AgentStatusUpdate(status) = event.msg { + for agent in status.agents { + if agent.id == agent_id { + let terminal = agent.status.eq_ignore_ascii_case("completed") + || agent.status.eq_ignore_ascii_case("failed") + || agent.status.eq_ignore_ascii_case("cancelled"); + latest = Some(agent); + if terminal { + return latest.expect("latest status should be set"); + } + } + } + } + } + + latest.unwrap_or_else(|| panic!("no status observed for agent {agent_id}")) +} + +async fn assert_no_agent_status_for(codex: &code_core::CodexConversation, agent_id: &str) { + let deadline = Instant::now() + Duration::from_millis(500); + while Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(Instant::now()); + let Ok(event) = timeout(remaining, codex.next_event()).await else { + return; + }; + let event = event.expect("event stream ended unexpectedly"); + if let EventMsg::AgentStatusUpdate(status) = event.msg { + assert!( + !status.agents.iter().any(|agent| agent.id == agent_id), + "observed another session's agent status" + ); + } + } +} + +fn sse_response(body: String) -> ResponseTemplate { + ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .set_body_string(body) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[serial] +async fn wake_on_agent_batch_completion_starts_new_turn() { + let code_home = TempDir::new().unwrap(); + let project_dir = TempDir::new().unwrap(); + + let server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path_regex(".*/responses$")) + .respond_with(sse_response(completed_response_body("resp-1", "ok"))) + .up_to_n_times(1) + .mount(&server) + .await; + + let (codex, session_id) = spawn_test_conversation(&code_home, &project_dir, &server).await; + initialize_agent_status_channel(&codex).await; let batch_id = format!("batch-{}", Uuid::new_v4()); let agent_config = AgentConfig { @@ -133,6 +198,7 @@ event: response.completed\ndata: {completed}\n\n", true, Some(batch_id.clone()), agent_config, + session_id, ReasoningEffort::Low, ) .await @@ -176,3 +242,76 @@ event: response.completed\ndata: {completed}\n\n", "expected a new TaskStarted after agent completion" ); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[serial] +async fn concurrent_same_repo_sessions_only_receive_owned_agent_status() { + let code_home = TempDir::new().unwrap(); + let project_dir = TempDir::new().unwrap(); + let server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path_regex(".*/responses$")) + .respond_with(sse_response(completed_response_body("resp-a", "ok a"))) + .up_to_n_times(1) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path_regex(".*/responses$")) + .respond_with(sse_response(completed_response_body("resp-b", "ok b"))) + .up_to_n_times(1) + .mount(&server) + .await; + + let (codex_a, session_a) = spawn_test_conversation(&code_home, &project_dir, &server).await; + let (codex_b, session_b) = spawn_test_conversation(&code_home, &project_dir, &server).await; + initialize_agent_status_channel(&codex_a).await; + initialize_agent_status_channel(&codex_b).await; + + let agent_a = { + let mut manager = AGENT_MANAGER.write().await; + manager + .create_agent( + "echo".to_string(), + Some("session-a-agent".to_string()), + "session a".to_string(), + None, + None, + Vec::new(), + true, + Some(format!("batch-a-{}", Uuid::new_v4())), + session_a, + ReasoningEffort::Low, + ) + .await + }; + + let observed_a = latest_agent_status_for(&codex_a, &agent_a).await; + assert_eq!(observed_a.id, agent_a); + assert_no_agent_status_for(&codex_b, &agent_a).await; + + let agent_b = { + let mut manager = AGENT_MANAGER.write().await; + manager + .create_agent( + "echo".to_string(), + Some("session-b-agent".to_string()), + "session b".to_string(), + None, + None, + Vec::new(), + true, + Some(format!("batch-b-{}", Uuid::new_v4())), + session_b, + ReasoningEffort::Low, + ) + .await + }; + + let observed_b = latest_agent_status_for(&codex_b, &agent_b).await; + assert_eq!(observed_b.id, agent_b); + assert_no_agent_status_for(&codex_a, &agent_b).await; + + codex_a.submit(Op::Shutdown).await.unwrap(); + codex_b.submit(Op::Shutdown).await.unwrap(); +} diff --git a/code-rs/tui/src/chatwidget.rs b/code-rs/tui/src/chatwidget.rs index 45cb8ad24ca..f8ba556a170 100644 --- a/code-rs/tui/src/chatwidget.rs +++ b/code-rs/tui/src/chatwidget.rs @@ -31429,6 +31429,7 @@ Have we met every part of this goal and is there no further work to do?"# async fn run_background_review( config: Config, app_event_tx: AppEventSender, + owner_session_id: Uuid, base_snapshot: Option, turn_context: Option, prefer_fallback: bool, @@ -31647,6 +31648,7 @@ async fn run_background_review( false, Some(branch.clone()), Some(agent_config.clone()), + owner_session_id, Some(branch.clone()), Some(snapshot_id.clone()), Some(code_core::protocol::AgentSourceKind::AutoReview), @@ -39201,6 +39203,7 @@ impl ChatWidget<'_> { let config = self.config.clone(); let app_event_tx = self.app_event_tx.clone(); + let owner_session_id = self.session_id.unwrap_or_else(Uuid::new_v4); let base_snapshot_for_task = base_snapshot.clone(); let turn_context = self.turn_context_block(); let prefer_fallback = had_notice || had_fixed_indicator; @@ -39208,6 +39211,7 @@ impl ChatWidget<'_> { run_background_review( config, app_event_tx, + owner_session_id, base_snapshot_for_task, turn_context, prefer_fallback,