Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions code-rs/core/src/agent_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,12 @@ impl AgentManager {
.or_else(|| self.archived_terminal_agents.get(agent_id).cloned())
}

pub fn get_agent_for_session(&self, agent_id: &str, owner_session_id: Uuid) -> Option<Agent> {
self
.get_agent(agent_id)
.filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id)))
}

pub fn get_all_agents(&self) -> impl Iterator<Item = &Agent> {
self.agents.values()
}
Expand Down Expand Up @@ -1176,6 +1182,20 @@ impl AgentManager {
out
}

pub fn list_agents_for_session(
&self,
status_filter: Option<AgentStatus>,
batch_id: Option<String>,
recent_only: bool,
owner_session_id: Uuid,
) -> Vec<Agent> {
self
.list_agents(status_filter, batch_id, recent_only)
.into_iter()
.filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id)))
.collect()
}

pub fn has_active_agents(&self) -> bool {
self.agents
.values()
Expand All @@ -1196,6 +1216,21 @@ impl AgentManager {
}
}

pub async fn cancel_agent_for_session(
&mut self,
agent_id: &str,
owner_session_id: Uuid,
) -> bool {
if self
.get_agent(agent_id)
.is_some_and(|agent| agent_belongs_to_session(&agent, Some(owner_session_id)))
{
self.cancel_agent(agent_id).await
} else {
false
}
}

pub async fn cancel_batch(&mut self, batch_id: &str) -> usize {
let agent_ids: Vec<String> = self
.agents
Expand All @@ -1213,6 +1248,28 @@ impl AgentManager {
count
}

pub async fn cancel_batch_for_session(
&mut self,
batch_id: &str,
owner_session_id: Uuid,
) -> usize {
let agent_ids: Vec<String> = self
.agents
.values()
.filter(|agent| agent.batch_id.as_ref() == Some(&batch_id.to_string()))
.filter(|agent| agent_belongs_to_session(agent, Some(owner_session_id)))
.map(|agent| agent.id.clone())
.collect();

let mut count = 0;
for agent_id in agent_ids {
if self.cancel_agent(&agent_id).await {
count += 1;
}
}
count
}

pub async fn update_agent_status(&mut self, agent_id: &str, status: AgentStatus) {
let mut terminal = false;
if let Some(agent) = self.agents.get_mut(agent_id) {
Expand Down Expand Up @@ -3026,6 +3083,42 @@ mod tests {
}
}

fn test_agent(
id: &str,
owner_session_id: Uuid,
batch_id: &str,
status: AgentStatus,
) -> Agent {
let now = chrono::Utc::now();
Agent {
id: id.to_string(),
owner_session_id: Some(owner_session_id),
batch_id: Some(batch_id.to_string()),
model: "code-gpt-5.5".to_string(),
name: Some(id.to_string()),
prompt: "prompt".to_string(),
context: None,
output_goal: None,
files: Vec::new(),
read_only: true,
status,
result: None,
error: None,
created_at: now,
started_at: Some(now),
completed_at: None,
progress: Vec::new(),
worktree_path: None,
branch_name: None,
worktree_base: None,
source_kind: None,
log_tag: None,
config: None,
reasoning_effort: ReasoningEffort::Low,
last_activity: now,
}
}

#[test]
fn code_family_falls_back_when_command_missing() {
let cfg = agent_with_command("definitely-not-present-429");
Expand Down Expand Up @@ -3134,6 +3227,52 @@ mod tests {
assert!(!payload_b.agents.iter().any(|agent| agent.id == agent_a));
}

#[tokio::test]
async fn model_facing_agent_queries_are_session_scoped() {
let mut manager = AgentManager::new();
let session_a = Uuid::new_v4();
let session_b = Uuid::new_v4();
let shared_batch = "shared-batch";

manager.agents.insert(
"agent-a".to_string(),
test_agent("agent-a", session_a, shared_batch, AgentStatus::Running),
);
manager.agents.insert(
"agent-b".to_string(),
test_agent("agent-b", session_b, shared_batch, AgentStatus::Running),
);
manager.handles.insert("agent-a".to_string(), tokio::spawn(async {}));
manager.handles.insert("agent-b".to_string(), tokio::spawn(async {}));
manager.archived_terminal_agents.insert(
"archived-b".to_string(),
test_agent("archived-b", session_b, shared_batch, AgentStatus::Completed),
);

let visible_to_a = manager.list_agents_for_session(
None,
Some(shared_batch.to_string()),
false,
session_a,
);
assert_eq!(visible_to_a.len(), 1);
assert_eq!(visible_to_a[0].id, "agent-a");
assert!(manager.get_agent_for_session("agent-b", session_a).is_none());
assert!(manager.get_agent_for_session("archived-b", session_a).is_none());
assert!(!manager.cancel_agent_for_session("agent-b", session_a).await);

assert!(manager.agents.contains_key("agent-b"));
assert!(manager.cancel_batch_for_session(shared_batch, session_a).await == 1);
assert_eq!(
manager.agents.get("agent-a").map(|agent| &agent.status),
Some(&AgentStatus::Cancelled),
);
assert_eq!(
manager.agents.get("agent-b").map(|agent| &agent.status),
Some(&AgentStatus::Running),
);
}

#[tokio::test]
async fn agent_status_updates_prune_closed_session_senders() {
let mut manager = AgentManager::new();
Expand Down
6 changes: 0 additions & 6 deletions code-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,10 +1171,6 @@ pub struct Codex {
rx_event: Receiver<Event>,
}

// Allow internal components (like background exec completions) to trigger a new
// turn without fabricating a visible user message. We enqueue an empty
// UserInput; the model will only see queued developer/system items.
static TX_SUB_GLOBAL: OnceLock<Sender<Submission>> = OnceLock::new();
static ANY_BG_NOTIFY: OnceLock<std::sync::Arc<Notify>> = OnceLock::new();

/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
Expand Down Expand Up @@ -1247,8 +1243,6 @@ impl Codex {
tx_sub,
rx_event,
};
// Make a clone of tx_sub available for internal auto-turn triggers.
let _ = TX_SUB_GLOBAL.set(codex.tx_sub.clone());
let _ = ANY_BG_NOTIFY.set(std::sync::Arc::new(Notify::new()));
let init_id = codex.submit(configure_session).await?;

Expand Down
62 changes: 45 additions & 17 deletions code-rs/core/src/codex/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ pub(super) async fn submission_loop(
if !seen_batches.insert(trimmed.to_string()) {
continue;
}
cancelled += manager.cancel_batch(trimmed).await;
cancelled += manager
.cancel_batch_for_session(trimmed, sess_arc.session_uuid())
.await;
}

for agent_id in agent_ids {
Expand All @@ -288,7 +290,10 @@ pub(super) async fn submission_loop(
if !seen_agents.insert(trimmed.to_string()) {
continue;
}
if manager.cancel_agent(trimmed).await {
if manager
.cancel_agent_for_session(trimmed, sess_arc.session_uuid())
.await
{
cancelled += 1;
}
}
Expand Down Expand Up @@ -8911,7 +8916,7 @@ async fn handle_check_agent_status(
Ok(params) => {
let manager = AGENT_MANAGER.read().await;

if let Some(agent) = manager.get_agent(&params.agent_id) {
if let Some(agent) = manager.get_agent_for_session(&params.agent_id, sess.session_uuid()) {
match agent.batch_id.as_deref() {
Some(batch) if batch == params.batch_id => {}
_ => {
Expand Down Expand Up @@ -8958,7 +8963,7 @@ async fn handle_check_agent_status(
};
// Re-acquire manager to get fresh progress after potential delay
let manager = AGENT_MANAGER.read().await;
if let Some(agent) = manager.get_agent(&params.agent_id) {
if let Some(agent) = manager.get_agent_for_session(&params.agent_id, sess.session_uuid()) {
let joined = agent.progress.join("\n");
match write_agent_file(&dir, "progress.log", &joined) {
Ok(p) => progress_file = Some(p.display().to_string()),
Expand Down Expand Up @@ -9037,7 +9042,7 @@ async fn handle_get_agent_result(
Ok(params) => {
let manager = AGENT_MANAGER.read().await;

if let Some(agent) = manager.get_agent(&params.agent_id) {
if let Some(agent) = manager.get_agent_for_session(&params.agent_id, sess.session_uuid()) {
match agent.batch_id.as_deref() {
Some(batch) if batch == params.batch_id => {}
_ => {
Expand Down Expand Up @@ -9171,7 +9176,7 @@ async fn handle_cancel_agent(
};
}
};
if let Some(agent) = manager.get_agent(&agent_id) {
if let Some(agent) = manager.get_agent_for_session(&agent_id, sess.session_uuid()) {
if agent.batch_id.as_deref() != Some(batch_id.as_str()) {
return ResponseInputItem::FunctionCallOutput {
call_id: call_id_clone,
Expand All @@ -9184,7 +9189,7 @@ async fn handle_cancel_agent(
};
}
}
if manager.cancel_agent(&agent_id).await {
if manager.cancel_agent_for_session(&agent_id, sess.session_uuid()).await {
ResponseInputItem::FunctionCallOutput {
call_id: call_id_clone,
output: FunctionCallOutputPayload {
Expand All @@ -9200,7 +9205,7 @@ async fn handle_cancel_agent(
}
}
} else if let Some(batch_id) = params.batch_id {
let count = manager.cancel_batch(&batch_id).await;
let count = manager.cancel_batch_for_session(&batch_id, sess.session_uuid()).await;
ResponseInputItem::FunctionCallOutput {
call_id: call_id_clone,
output: FunctionCallOutputPayload {
Expand Down Expand Up @@ -9273,7 +9278,7 @@ async fn handle_wait_for_agent(
let manager = AGENT_MANAGER.read().await;

if let Some(agent_id) = &params.agent_id {
if let Some(agent) = manager.get_agent(agent_id) {
if let Some(agent) = manager.get_agent_for_session(agent_id, sess.session_uuid()) {
match agent.batch_id.as_deref() {
Some(batch) if batch == batch_id => {}
_ => {
Expand Down Expand Up @@ -9362,7 +9367,7 @@ async fn handle_wait_for_agent(
}
}
} else {
let agents = manager.list_agents(None, Some(batch_id.clone()), false);
let agents = manager.list_agents_for_session(None, Some(batch_id.clone()), false, sess.session_uuid());

// Separate terminal vs non-terminal agents
let completed_agents: Vec<_> = agents
Expand Down Expand Up @@ -9678,10 +9683,11 @@ async fn handle_list_agents(
_ => None,
});

let agents = manager.list_agents(
let agents = manager.list_agents_for_session(
status_filter,
Some(batch_id.clone()),
params.recent_only.unwrap_or(false),
sess.session_uuid(),
);

// Count running agents for status update
Expand Down Expand Up @@ -10719,6 +10725,7 @@ async fn handle_container_exec_with_params(
let suppress_event_flag_task = suppress_event_flag.clone();
let display_label_task = display_label.clone();
let tool_output_max_bytes = sess.tool_output_max_bytes;
let sess_for_background_completion = sess.self_handle.upgrade();
let task_handle = tokio::spawn(async move {
// Build stdout stream with tail capture. We cannot stamp via `Session` here,
// but deltas will be delivered with neutral ordering which the UI tolerates.
Expand Down Expand Up @@ -10809,10 +10816,10 @@ async fn handle_container_exec_with_params(
format!("{label} completed in background")
};
let bg_event = EventMsg::BackgroundEvent(BackgroundEventEvent { message });
let ev = Event { id: sub_id_for_events.clone(), event_seq: 0, msg: bg_event, order: None };
let _ = tx_event.send(ev).await;
if let Some(sess_arc) = sess_for_background_completion.as_ref() {
let ev = sess_arc.make_event(&sub_id_for_events, bg_event);
sess_arc.send_event(ev).await;

if let Some(tx) = TX_SUB_GLOBAL.get() {
let header_label = if label.is_empty() {
format!("call_id={}", call_id_for_events)
} else {
Expand All @@ -10828,9 +10835,30 @@ async fn handle_container_exec_with_params(
tool_output_max_bytes,
);
let dev_text = format!("{}\n\n{}", header, body);
let _ = tx
.send(Submission { id: uuid::Uuid::new_v4().to_string(), op: Op::AddPendingInputDeveloper { text: dev_text } })
.await;
let dev_msg = ResponseInputItem::Message {
role: "developer".to_string(),
content: vec![ContentItem::InputText { text: dev_text }],
};
if sess_arc.enqueue_out_of_turn_item(dev_msg) {
sess_arc.cleanup_old_status_items().await;
let turn_context = sess_arc.make_turn_context();
let sub_id = sess_arc.next_internal_sub_id();
let sentinel_input = vec![InputItem::Text {
text: PENDING_ONLY_SENTINEL.to_string(),
}];
let agent = AgentTask::spawn(
Arc::clone(sess_arc),
turn_context,
sub_id,
sentinel_input,
TaskOriginKind::OutOfTurnDeveloper,
false,
);
sess_arc.set_task(agent);
}
} else {
let ev = Event { id: sub_id_for_events.clone(), event_seq: 0, msg: bg_event, order: None };
let _ = tx_event.send(ev).await;
}
}
if let Some(n) = ANY_BG_NOTIFY.get() { n.notify_waiters(); }
Expand Down