Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 91 additions & 11 deletions code-rs/core/src/agent_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ struct AgentStatusSender {

fn agent_belongs_to_session(agent: &Agent, owner_session_id: Option<Uuid>) -> bool {
match owner_session_id {
Some(owner_session_id) => agent.owner_session_id == Some(owner_session_id),
Some(owner_session_id) => agent
.owner_session_id
.is_none_or(|agent_owner_session_id| agent_owner_session_id == owner_session_id),
None => true,
}
}
Expand Down Expand Up @@ -539,13 +541,23 @@ impl AgentManager {
owner_session_id,
sender,
});
// A fresh sender set is either the first UI for this manager or a UI
// reconnect after all previous senders closed. Keep archived results
// for the reconnecting session, but drop archives from older sessions
// so a long-lived manager does not retain stale terminal agents forever.
// Keep archived results for connected sessions and legacy unowned
// agents, but drop archives from older disconnected sessions so a
// long-lived manager does not retain stale terminal agents forever.
let connected_sessions: HashSet<Uuid> = self
.event_senders
.iter()
.filter(|registered| !registered.sender.is_closed())
.map(|registered| registered.owner_session_id)
.collect();
self.archived_terminal_agents.retain(|_, agent| {
agent
.owner_session_id
.is_none_or(|agent_owner_session_id| connected_sessions.contains(&agent_owner_session_id))
});
self.diagnostics.archived_terminal_agents = self.archived_terminal_agents.len() as u64;

if fresh_sender_set {
self.archived_terminal_agents
.retain(|_, agent| agent_belongs_to_session(agent, Some(owner_session_id)));
self.diagnostics = AgentManagerDiagnostics::default();
self.diagnostics.archived_terminal_agents = self.archived_terminal_agents.len() as u64;
}
Expand Down Expand Up @@ -3251,17 +3263,31 @@ mod tests {
"archived-b".to_string(),
test_agent("archived-b", session_b, shared_batch, AgentStatus::Completed),
);
let mut legacy_agent = test_agent(
"legacy-agent",
session_b,
shared_batch,
AgentStatus::Running,
);
legacy_agent.owner_session_id = None;
manager
.agents
.insert("legacy-agent".to_string(), legacy_agent);

let visible_to_a = manager.list_agents_for_session(
None,
Some(shared_batch.to_string()),
false,
session_a,
);
assert_eq!(visible_to_a.len(), 1);
assert_eq!(visible_to_a[0].id, "agent-a");
assert_eq!(visible_to_a.len(), 2);
assert!(visible_to_a.iter().any(|agent| agent.id == "agent-a"));
assert!(visible_to_a.iter().any(|agent| agent.id == "legacy-agent"));
assert!(manager.get_agent_for_session("agent-b", session_a).is_none());
assert!(manager.get_agent_for_session("archived-b", session_a).is_none());
assert!(manager
.get_agent_for_session("legacy-agent", session_a)
.is_some());
assert!(!manager.cancel_agent_for_session("agent-b", session_a).await);

assert!(manager.agents.contains_key("agent-b"));
Expand Down Expand Up @@ -3296,16 +3322,17 @@ mod tests {
#[tokio::test]
async fn registering_second_session_keeps_existing_archived_agents() {
let mut manager = AgentManager::new();
let session_a = Uuid::new_v4();
let (tx_a, _rx_a) = tokio::sync::mpsc::unbounded_channel();
manager.set_event_sender(Uuid::new_v4(), tx_a);
manager.set_event_sender(session_a, tx_a);

let now = chrono::Utc::now();
let archived_id = "archived-agent".to_string();
manager.archived_terminal_agents.insert(
archived_id.clone(),
Agent {
id: archived_id.clone(),
owner_session_id: Some(Uuid::new_v4()),
owner_session_id: Some(session_a),
batch_id: Some("batch-archived".to_string()),
model: "code-gpt-5.5".to_string(),
name: Some("Archived".to_string()),
Expand Down Expand Up @@ -3399,6 +3426,59 @@ mod tests {
assert_eq!(manager.diagnostics.archived_terminal_agents, 1);
}

#[tokio::test]
async fn registering_sender_prunes_archives_from_disconnected_sessions() {
let mut manager = AgentManager::new();
let connected_session = Uuid::new_v4();
let disconnected_session = Uuid::new_v4();
let new_session = Uuid::new_v4();
let (tx_connected, _rx_connected) = tokio::sync::mpsc::unbounded_channel();
manager.set_event_sender(connected_session, tx_connected);

manager.archived_terminal_agents.insert(
"connected-archived".to_string(),
test_agent(
"connected-archived",
connected_session,
"batch-connected-archived",
AgentStatus::Completed,
),
);
manager.archived_terminal_agents.insert(
"disconnected-archived".to_string(),
test_agent(
"disconnected-archived",
disconnected_session,
"batch-disconnected-archived",
AgentStatus::Completed,
),
);
let mut legacy_archived = test_agent(
"legacy-archived",
disconnected_session,
"batch-legacy-archived",
AgentStatus::Completed,
);
legacy_archived.owner_session_id = None;
manager
.archived_terminal_agents
.insert("legacy-archived".to_string(), legacy_archived);

let (tx_new, _rx_new) = tokio::sync::mpsc::unbounded_channel();
manager.set_event_sender(new_session, tx_new);

assert!(manager
.archived_terminal_agents
.contains_key("connected-archived"));
assert!(manager
.archived_terminal_agents
.contains_key("legacy-archived"));
assert!(!manager
.archived_terminal_agents
.contains_key("disconnected-archived"));
assert_eq!(manager.diagnostics.archived_terminal_agents, 2);
}

#[tokio::test]
async fn read_only_agents_use_code_binary_path() {
let _lock = env_lock().lock().expect("env lock");
Expand Down