Skip to content

Commit 6d2f4aa

Browse files
authored
feat: use ProcessId in exec-server (#15866)
Use a full struct for the ProcessId to increase readability and make it easier in the future to make it evolve if needed
1 parent a5824e3 commit 6d2f4aa

File tree

11 files changed

+137
-90
lines changed

11 files changed

+137
-90
lines changed

codex-rs/core/src/unified_exec/process_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ impl UnifiedExecProcessManager {
603603
let started = environment
604604
.get_exec_backend()
605605
.start(codex_exec_server::ExecParams {
606-
process_id: exec_server_process_id(process_id),
606+
process_id: exec_server_process_id(process_id).into(),
607607
argv: env.command.clone(),
608608
cwd: env.cwd.clone(),
609609
env: env.env.clone(),

codex-rs/exec-server/src/client.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ struct Inner {
113113
// process on the connection. Keep a local process_id -> session registry so
114114
// we can turn those connection-global notifications into process wakeups
115115
// without making notifications the source of truth for output delivery.
116-
sessions: ArcSwap<HashMap<String, Arc<SessionState>>>,
116+
sessions: ArcSwap<HashMap<ProcessId, Arc<SessionState>>>,
117117
// ArcSwap makes reads cheap on the hot notification path, but writes still
118118
// need serialization so concurrent register/remove operations do not
119119
// overwrite each other's copy-on-write updates.
@@ -225,29 +225,32 @@ impl ExecServerClient {
225225

226226
pub async fn write(
227227
&self,
228-
process_id: &str,
228+
process_id: &ProcessId,
229229
chunk: Vec<u8>,
230230
) -> Result<WriteResponse, ExecServerError> {
231231
self.inner
232232
.client
233233
.call(
234234
EXEC_WRITE_METHOD,
235235
&WriteParams {
236-
process_id: process_id.to_string(),
236+
process_id: process_id.clone(),
237237
chunk: chunk.into(),
238238
},
239239
)
240240
.await
241241
.map_err(Into::into)
242242
}
243243

244-
pub async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
244+
pub async fn terminate(
245+
&self,
246+
process_id: &ProcessId,
247+
) -> Result<TerminateResponse, ExecServerError> {
245248
self.inner
246249
.client
247250
.call(
248251
EXEC_TERMINATE_METHOD,
249252
&TerminateParams {
250-
process_id: process_id.to_string(),
253+
process_id: process_id.clone(),
251254
},
252255
)
253256
.await
@@ -330,20 +333,20 @@ impl ExecServerClient {
330333

331334
pub(crate) async fn register_session(
332335
&self,
333-
process_id: &str,
336+
process_id: &ProcessId,
334337
) -> Result<Session, ExecServerError> {
335338
let state = Arc::new(SessionState::new());
336339
self.inner
337340
.insert_session(process_id, Arc::clone(&state))
338341
.await?;
339342
Ok(Session {
340343
client: self.clone(),
341-
process_id: process_id.to_string().into(),
344+
process_id: process_id.clone(),
342345
state,
343346
})
344347
}
345348

346-
pub(crate) async fn unregister_session(&self, process_id: &str) {
349+
pub(crate) async fn unregister_session(&self, process_id: &ProcessId) {
347350
self.inner.remove_session(process_id).await;
348351
}
349352

@@ -487,7 +490,7 @@ impl Session {
487490
match self
488491
.client
489492
.read(ReadParams {
490-
process_id: self.process_id.to_string(),
493+
process_id: self.process_id.clone(),
491494
after_seq,
492495
max_bytes,
493496
wait_ms,
@@ -519,13 +522,13 @@ impl Session {
519522
}
520523

521524
impl Inner {
522-
fn get_session(&self, process_id: &str) -> Option<Arc<SessionState>> {
525+
fn get_session(&self, process_id: &ProcessId) -> Option<Arc<SessionState>> {
523526
self.sessions.load().get(process_id).cloned()
524527
}
525528

526529
async fn insert_session(
527530
&self,
528-
process_id: &str,
531+
process_id: &ProcessId,
529532
session: Arc<SessionState>,
530533
) -> Result<(), ExecServerError> {
531534
let _sessions_write_guard = self.sessions_write_lock.lock().await;
@@ -536,12 +539,12 @@ impl Inner {
536539
)));
537540
}
538541
let mut next_sessions = sessions.as_ref().clone();
539-
next_sessions.insert(process_id.to_string(), session);
542+
next_sessions.insert(process_id.clone(), session);
540543
self.sessions.store(Arc::new(next_sessions));
541544
Ok(())
542545
}
543546

544-
async fn remove_session(&self, process_id: &str) -> Option<Arc<SessionState>> {
547+
async fn remove_session(&self, process_id: &ProcessId) -> Option<Arc<SessionState>> {
545548
let _sessions_write_guard = self.sessions_write_lock.lock().await;
546549
let sessions = self.sessions.load();
547550
let session = sessions.get(process_id).cloned();
@@ -552,7 +555,7 @@ impl Inner {
552555
session
553556
}
554557

555-
async fn take_all_sessions(&self) -> HashMap<String, Arc<SessionState>> {
558+
async fn take_all_sessions(&self) -> HashMap<ProcessId, Arc<SessionState>> {
556559
let _sessions_write_guard = self.sessions_write_lock.lock().await;
557560
let sessions = self.sessions.load();
558561
let drained_sessions = sessions.as_ref().clone();
@@ -640,6 +643,7 @@ mod tests {
640643

641644
use super::ExecServerClient;
642645
use super::ExecServerClientConnectOptions;
646+
use crate::ProcessId;
643647
use crate::connection::JsonRpcConnection;
644648
use crate::protocol::EXEC_EXITED_METHOD;
645649
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
@@ -718,12 +722,14 @@ mod tests {
718722
.await
719723
.expect("client should connect");
720724

725+
let noisy_process_id = ProcessId::from("noisy");
726+
let quiet_process_id = ProcessId::from("quiet");
721727
let _noisy_session = client
722-
.register_session("noisy")
728+
.register_session(&noisy_process_id)
723729
.await
724730
.expect("noisy session should register");
725731
let quiet_session = client
726-
.register_session("quiet")
732+
.register_session(&quiet_process_id)
727733
.await
728734
.expect("quiet session should register");
729735
let mut quiet_wake_rx = quiet_session.subscribe_wake();
@@ -734,7 +740,7 @@ mod tests {
734740
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
735741
params: Some(
736742
serde_json::to_value(ExecOutputDeltaNotification {
737-
process_id: "noisy".to_string(),
743+
process_id: noisy_process_id.clone(),
738744
seq,
739745
stream: ExecOutputStream::Stdout,
740746
chunk: b"x".to_vec().into(),
@@ -751,7 +757,7 @@ mod tests {
751757
method: EXEC_EXITED_METHOD.to_string(),
752758
params: Some(
753759
serde_json::to_value(ExecExitedNotification {
754-
process_id: "quiet".to_string(),
760+
process_id: quiet_process_id,
755761
seq: 1,
756762
exit_code: 17,
757763
})

codex-rs/exec-server/src/environment.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ mod tests {
159159

160160
use super::Environment;
161161
use super::EnvironmentManager;
162+
use crate::ProcessId;
162163
use pretty_assertions::assert_eq;
163164

164165
#[tokio::test]
@@ -195,7 +196,7 @@ mod tests {
195196
let response = environment
196197
.get_exec_backend()
197198
.start(crate::ExecParams {
198-
process_id: "default-env-proc".to_string(),
199+
process_id: ProcessId::from("default-env-proc"),
199200
argv: vec!["true".to_string()],
200201
cwd: std::env::current_dir().expect("read current dir"),
201202
env: Default::default(),

codex-rs/exec-server/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod file_system;
66
mod local_file_system;
77
mod local_process;
88
mod process;
9+
mod process_id;
910
mod protocol;
1011
mod remote_file_system;
1112
mod remote_process;
@@ -43,8 +44,8 @@ pub use file_system::ReadDirectoryEntry;
4344
pub use file_system::RemoveOptions;
4445
pub use process::ExecBackend;
4546
pub use process::ExecProcess;
46-
pub use process::ProcessId;
4747
pub use process::StartedExecProcess;
48+
pub use process_id::ProcessId;
4849
pub use protocol::ExecClosedNotification;
4950
pub use protocol::ExecExitedNotification;
5051
pub use protocol::ExecOutputDeltaNotification;

codex-rs/exec-server/src/local_process.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ enum ProcessEntry {
7575

7676
struct Inner {
7777
notifications: RpcNotificationSender,
78-
processes: Mutex<HashMap<String, ProcessEntry>>,
78+
processes: Mutex<HashMap<ProcessId, ProcessEntry>>,
7979
initialize_requested: AtomicBool,
8080
initialized: AtomicBool,
8181
}
@@ -420,7 +420,7 @@ impl ExecBackend for LocalProcess {
420420
.map_err(map_handler_error)?;
421421
Ok(StartedExecProcess {
422422
process: Arc::new(LocalExecProcess {
423-
process_id: response.process_id.into(),
423+
process_id: response.process_id,
424424
backend: self.clone(),
425425
wake_tx,
426426
}),
@@ -461,13 +461,13 @@ impl ExecProcess for LocalExecProcess {
461461
impl LocalProcess {
462462
async fn read(
463463
&self,
464-
process_id: &str,
464+
process_id: &ProcessId,
465465
after_seq: Option<u64>,
466466
max_bytes: Option<usize>,
467467
wait_ms: Option<u64>,
468468
) -> Result<ReadResponse, ExecServerError> {
469469
self.exec_read(ReadParams {
470-
process_id: process_id.to_string(),
470+
process_id: process_id.clone(),
471471
after_seq,
472472
max_bytes,
473473
wait_ms,
@@ -478,20 +478,20 @@ impl LocalProcess {
478478

479479
async fn write(
480480
&self,
481-
process_id: &str,
481+
process_id: &ProcessId,
482482
chunk: Vec<u8>,
483483
) -> Result<WriteResponse, ExecServerError> {
484484
self.exec_write(WriteParams {
485-
process_id: process_id.to_string(),
485+
process_id: process_id.clone(),
486486
chunk: chunk.into(),
487487
})
488488
.await
489489
.map_err(map_handler_error)
490490
}
491491

492-
async fn terminate(&self, process_id: &str) -> Result<(), ExecServerError> {
492+
async fn terminate(&self, process_id: &ProcessId) -> Result<(), ExecServerError> {
493493
self.terminate_process(TerminateParams {
494-
process_id: process_id.to_string(),
494+
process_id: process_id.clone(),
495495
})
496496
.await
497497
.map_err(map_handler_error)?;
@@ -507,7 +507,7 @@ fn map_handler_error(error: JSONRPCErrorError) -> ExecServerError {
507507
}
508508

509509
async fn stream_output(
510-
process_id: String,
510+
process_id: ProcessId,
511511
stream: ExecOutputStream,
512512
mut receiver: tokio::sync::mpsc::Receiver<Vec<u8>>,
513513
inner: Arc<Inner>,
@@ -560,7 +560,7 @@ async fn stream_output(
560560
}
561561

562562
async fn watch_exit(
563-
process_id: String,
563+
process_id: ProcessId,
564564
exit_rx: tokio::sync::oneshot::Receiver<i32>,
565565
inner: Arc<Inner>,
566566
output_notify: Arc<Notify>,
@@ -605,7 +605,7 @@ async fn watch_exit(
605605
}
606606
}
607607

608-
async fn finish_output_stream(process_id: String, inner: Arc<Inner>) {
608+
async fn finish_output_stream(process_id: ProcessId, inner: Arc<Inner>) {
609609
{
610610
let mut processes = inner.processes.lock().await;
611611
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
@@ -620,7 +620,7 @@ async fn finish_output_stream(process_id: String, inner: Arc<Inner>) {
620620
maybe_emit_closed(process_id, inner).await;
621621
}
622622

623-
async fn maybe_emit_closed(process_id: String, inner: Arc<Inner>) {
623+
async fn maybe_emit_closed(process_id: ProcessId, inner: Arc<Inner>) {
624624
let notification = {
625625
let mut processes = inner.processes.lock().await;
626626
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {

codex-rs/exec-server/src/process.rs

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,18 @@
1-
use std::fmt;
2-
use std::ops::Deref;
31
use std::sync::Arc;
42

53
use async_trait::async_trait;
64
use tokio::sync::watch;
75

86
use crate::ExecServerError;
7+
use crate::ProcessId;
98
use crate::protocol::ExecParams;
109
use crate::protocol::ReadResponse;
1110
use crate::protocol::WriteResponse;
1211

13-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14-
pub struct ProcessId(String);
15-
1612
pub struct StartedExecProcess {
1713
pub process: Arc<dyn ExecProcess>,
1814
}
1915

20-
impl ProcessId {
21-
pub fn as_str(&self) -> &str {
22-
&self.0
23-
}
24-
25-
pub fn into_inner(self) -> String {
26-
self.0
27-
}
28-
}
29-
30-
impl Deref for ProcessId {
31-
type Target = str;
32-
33-
fn deref(&self) -> &Self::Target {
34-
self.as_str()
35-
}
36-
}
37-
38-
impl AsRef<str> for ProcessId {
39-
fn as_ref(&self) -> &str {
40-
self.as_str()
41-
}
42-
}
43-
44-
impl fmt::Display for ProcessId {
45-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46-
self.0.fmt(f)
47-
}
48-
}
49-
50-
impl From<String> for ProcessId {
51-
fn from(value: String) -> Self {
52-
Self(value)
53-
}
54-
}
55-
5616
#[async_trait]
5717
pub trait ExecProcess: Send + Sync {
5818
fn process_id(&self) -> &ProcessId;

0 commit comments

Comments
 (0)