Skip to content

Commit 801ac3e

Browse files
committed
feat(sidecar): support threaded connection for windows
Signed-off-by: Alexandre Rulleau <alexandre.rulleau@datadoghq.com>
1 parent 43a7e74 commit 801ac3e

File tree

3 files changed

+220
-16
lines changed

3 files changed

+220
-16
lines changed

datadog-sidecar-ffi/src/lib.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ use std::slice;
6060
use std::sync::Arc;
6161
use std::time::Duration;
6262

63+
use datadog_sidecar::setup::{connect_to_master, MasterListener};
64+
6365
#[no_mangle]
6466
#[cfg(target_os = "windows")]
6567
pub extern "C" fn ddog_setup_crashtracking(
@@ -311,53 +313,38 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -
311313
}
312314

313315
#[no_mangle]
314-
#[cfg(unix)]
315316
pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError {
316-
use datadog_sidecar::setup::MasterListener;
317-
318317
let cfg = datadog_sidecar::config::FromEnv::config();
319318
try_c!(MasterListener::start(pid, cfg));
320319

321320
MaybeError::None
322321
}
323322

324323
#[no_mangle]
325-
#[cfg(unix)]
326324
pub extern "C" fn ddog_sidecar_connect_worker(
327325
pid: i32,
328326
connection: &mut *mut SidecarTransport,
329327
) -> MaybeError {
330-
use datadog_sidecar::setup::connect_to_master;
331-
332328
let transport = try_c!(connect_to_master(pid));
333329
*connection = Box::into_raw(transport);
334330

335331
MaybeError::None
336332
}
337333

338334
#[no_mangle]
339-
#[cfg(unix)]
340335
pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError {
341-
use datadog_sidecar::setup::MasterListener;
342-
343336
try_c!(MasterListener::shutdown());
344337

345338
MaybeError::None
346339
}
347340

348341
#[no_mangle]
349-
#[cfg(unix)]
350342
pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool {
351-
use datadog_sidecar::setup::MasterListener;
352-
353343
MasterListener::is_active(pid)
354344
}
355345

356346
#[no_mangle]
357-
#[cfg(unix)]
358347
pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError {
359-
use datadog_sidecar::setup::MasterListener;
360-
361348
try_c!(MasterListener::clear_inherited_state());
362349

363350
MaybeError::None

datadog-sidecar/src/setup/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ mod windows;
1212
#[cfg(windows)]
1313
pub use self::windows::*;
1414

15-
// Thread-based listener module (Unix only)
15+
// Thread-based listener module (Unix)
1616
#[cfg(unix)]
1717
pub mod thread_listener;
1818
#[cfg(unix)]
1919
pub use thread_listener::{connect_to_master, MasterListener};
2020

21+
// Thread-based listener module (Windows)
22+
#[cfg(windows)]
23+
pub mod thread_listener_windows;
24+
#[cfg(windows)]
25+
pub use thread_listener_windows::{connect_to_master, MasterListener};
26+
2127
use datadog_ipc::platform::Channel;
2228
use std::io;
2329

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::io;
5+
use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle};
6+
use std::sync::{Mutex, OnceLock};
7+
use std::thread::{self, JoinHandle};
8+
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
9+
use tokio::sync::oneshot;
10+
use tracing::{error, info};
11+
12+
use crate::config::Config;
13+
use crate::entry::MainLoopConfig;
14+
use crate::service::blocking::SidecarTransport;
15+
use datadog_ipc::platform::metadata::ProcessHandle;
16+
use datadog_ipc::platform::Channel;
17+
use datadog_ipc::transport::blocking::BlockingTransport;
18+
19+
static MASTER_LISTENER: OnceLock<Mutex<Option<MasterListener>>> = OnceLock::new();
20+
21+
pub struct MasterListener {
22+
shutdown_tx: Option<oneshot::Sender<()>>,
23+
thread_handle: Option<JoinHandle<()>>,
24+
pid: i32,
25+
}
26+
27+
impl MasterListener {
28+
/// Start the master listener thread using Windows Named Pipes.
29+
///
30+
/// This spawns a new OS thread that creates a named pipe server
31+
/// to listen for worker connections. Only one listener can be active per process.
32+
pub fn start(pid: i32, config: Config) -> io::Result<()> {
33+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
34+
let mut listener_guard = listener_mutex
35+
.lock()
36+
.map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?;
37+
38+
if listener_guard.is_some() {
39+
return Err(io::Error::other("Master listener is already running"));
40+
}
41+
42+
let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid);
43+
let (shutdown_tx, shutdown_rx) = oneshot::channel();
44+
45+
let thread_handle = thread::Builder::new()
46+
.name(format!("ddtrace-sidecar-listener-{}", pid))
47+
.spawn(move || {
48+
if let Err(e) = run_listener_windows(pipe_name, shutdown_rx) {
49+
error!("Listener thread error: {}", e);
50+
}
51+
})
52+
.map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?;
53+
54+
*listener_guard = Some(MasterListener {
55+
shutdown_tx: Some(shutdown_tx),
56+
thread_handle: Some(thread_handle),
57+
pid,
58+
});
59+
60+
info!("Started Windows named pipe listener (PID {})", pid);
61+
Ok(())
62+
}
63+
64+
/// Shutdown the master listener thread.
65+
///
66+
/// Sends shutdown signal and joins the listener thread. This is blocking
67+
/// and will wait for the thread to exit cleanly.
68+
pub fn shutdown() -> io::Result<()> {
69+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
70+
let mut listener_guard = listener_mutex
71+
.lock()
72+
.map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?;
73+
74+
if let Some(mut master) = listener_guard.take() {
75+
// Signal shutdown by sending to the oneshot sender
76+
if let Some(tx) = master.shutdown_tx.take() {
77+
let _ = tx.send(());
78+
}
79+
80+
if let Some(handle) = master.thread_handle.take() {
81+
handle
82+
.join()
83+
.map_err(|_| io::Error::other("Failed to join listener thread"))?;
84+
}
85+
86+
info!("Master listener thread shut down successfully");
87+
Ok(())
88+
} else {
89+
Err(io::Error::other("No master listener is running"))
90+
}
91+
}
92+
93+
/// Check if the master listener is active for the given PID.
94+
pub fn is_active(pid: i32) -> bool {
95+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
96+
if let Ok(listener_guard) = listener_mutex.lock() {
97+
listener_guard.as_ref().is_some_and(|l| l.pid == pid)
98+
} else {
99+
false
100+
}
101+
}
102+
103+
/// Clear inherited listener state.
104+
/// Kept for API compatibility with Unix version.
105+
pub fn clear_inherited_state() -> io::Result<()> {
106+
Ok(())
107+
}
108+
}
109+
110+
/// Accept connections in a loop for Windows named pipes.
111+
async fn accept_pipe_loop_windows(
112+
pipe_name: String,
113+
handler: Box<dyn Fn(tokio::net::windows::named_pipe::NamedPipeServer)>,
114+
mut shutdown_rx: oneshot::Receiver<()>,
115+
) -> io::Result<()> {
116+
let mut server = ServerOptions::new()
117+
.first_pipe_instance(true)
118+
.max_instances(254) // Windows allows up to 255 instances
119+
.create(&pipe_name)?;
120+
121+
info!("Named pipe server created at: {}", pipe_name);
122+
123+
loop {
124+
tokio::select! {
125+
_ = &mut shutdown_rx => {
126+
info!("Shutdown signal received in Windows pipe listener");
127+
break;
128+
}
129+
result = server.connect() => {
130+
match result {
131+
Ok(_) => {
132+
info!("Accepted new worker connection on named pipe");
133+
handler(server);
134+
135+
server = ServerOptions::new()
136+
.create(&pipe_name)?;
137+
}
138+
Err(e) => {
139+
error!("Failed to accept worker connection: {}", e);
140+
match ServerOptions::new().create(&pipe_name) {
141+
Ok(new_server) => server = new_server,
142+
Err(e2) => {
143+
error!("Failed to recover named pipe: {}", e2);
144+
break;
145+
}
146+
}
147+
}
148+
}
149+
}
150+
}
151+
}
152+
Ok(())
153+
}
154+
155+
/// Entry point for Windows named pipe listener
156+
fn run_listener_windows(pipe_name: String, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> {
157+
info!("Listener thread running, creating Windows named pipe server");
158+
159+
let acquire_listener = move || {
160+
let cancel = || {};
161+
let pipe_name_clone = pipe_name.clone();
162+
Ok((
163+
move |handler| accept_pipe_loop_windows(pipe_name_clone, handler, shutdown_rx),
164+
cancel,
165+
))
166+
};
167+
168+
let loop_config = MainLoopConfig {
169+
enable_ctrl_c_handler: false,
170+
enable_crashtracker: false,
171+
external_shutdown_rx: None,
172+
};
173+
174+
crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config)
175+
.map_err(|e| io::Error::other(format!("Windows thread listener failed: {}", e)))?;
176+
177+
info!("Listener thread exiting");
178+
Ok(())
179+
}
180+
181+
/// Connect to the master listener as a worker using Windows Named Pipes.
182+
///
183+
/// Establishes a connection to the master listener thread for the given PID.
184+
pub fn connect_to_master(pid: i32) -> io::Result<Box<SidecarTransport>> {
185+
info!("Connecting to master listener via named pipe (PID {})", pid);
186+
187+
let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid);
188+
189+
let client = ClientOptions::new().open(&pipe_name)?;
190+
191+
info!("Connected to named pipe: {}", pipe_name);
192+
193+
let raw_handle = client.as_raw_handle();
194+
let owned_handle = unsafe { OwnedHandle::from_raw_handle(raw_handle) };
195+
196+
std::mem::forget(client);
197+
198+
let process_handle =
199+
ProcessHandle::Getter(Box::new(move || Ok(ProcessHandle::Pid(pid as u32))));
200+
let channel = Channel::from_client_handle_and_pid(owned_handle, process_handle);
201+
202+
let transport = BlockingTransport::from(channel);
203+
204+
let sidecar_transport = Box::new(SidecarTransport {
205+
inner: Mutex::new(transport),
206+
reconnect_fn: None,
207+
});
208+
209+
info!("Successfully connected to master listener");
210+
Ok(sidecar_transport)
211+
}

0 commit comments

Comments
 (0)