Skip to content

Commit 22e6fd9

Browse files
committed
fix: make cpu timer compatible on multi-thread
1 parent 453d010 commit 22e6fd9

File tree

2 files changed

+49
-41
lines changed

2 files changed

+49
-41
lines changed

crates/base/src/runtime/mod.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2076,15 +2076,43 @@ fn get_cpu_metrics_guard<'l>(
20762076
return scopeguard::guard((), Box::new(|_| {}));
20772077
};
20782078

2079+
#[derive(Clone)]
2080+
struct CurrentCPUTimer {
2081+
thread_id: std::thread::ThreadId,
2082+
timer: CPUTimer,
2083+
}
2084+
20792085
let current_thread_id = std::thread::current().id();
20802086
let send_cpu_metrics_fn = move |metric: CPUUsageMetrics| {
20812087
let _ = cpu_usage_metrics_tx.send(metric);
20822088
};
20832089

2084-
send_cpu_metrics_fn(CPUUsageMetrics::Enter(
2085-
current_thread_id,
2086-
CPUTimer::get_or_init().unwrap(),
2087-
));
2090+
let mut state = op_state.borrow_mut();
2091+
let cpu_timer = if state.has::<CurrentCPUTimer>() {
2092+
let current_cpu_timer = state.borrow::<CurrentCPUTimer>();
2093+
if current_cpu_timer.thread_id != current_thread_id {
2094+
state.take::<CurrentCPUTimer>();
2095+
None
2096+
} else {
2097+
Some(current_cpu_timer.timer.clone())
2098+
}
2099+
} else {
2100+
None
2101+
};
2102+
let cpu_timer = if let Some(timer) = cpu_timer {
2103+
timer
2104+
} else {
2105+
let cpu_timer = CurrentCPUTimer {
2106+
thread_id: current_thread_id,
2107+
timer: CPUTimer::new().unwrap(),
2108+
};
2109+
2110+
state.put(cpu_timer.clone());
2111+
cpu_timer.timer
2112+
};
2113+
2114+
drop(state);
2115+
send_cpu_metrics_fn(CPUUsageMetrics::Enter(current_thread_id, cpu_timer));
20882116

20892117
let current_cpu_time_ns = get_current_cpu_time_ns().unwrap();
20902118

crates/cpu_timer/src/lib.rs

Lines changed: 17 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -80,26 +80,17 @@ pub struct CPUTimer {}
8080

8181
impl CPUTimer {
8282
#[cfg(not(target_os = "linux"))]
83-
pub fn get_or_init(_: u64) -> Result<Self, Error> {
83+
pub fn new(_: u64) -> Result<Self, Error> {
8484
log::error!("CPU timer: not enabled (need Linux)");
8585
Ok(Self {})
8686
}
8787

8888
#[cfg(target_os = "linux")]
89-
pub fn get_or_init() -> Result<Self, Error> {
90-
use std::cell::RefCell;
89+
pub fn new() -> Result<Self, Error> {
9190
use std::sync::atomic::Ordering;
9291

9392
use linux::*;
9493

95-
thread_local! {
96-
static TIMER: RefCell<Option<CPUTimer>> = RefCell::new(Option::default());
97-
}
98-
99-
if let Some(timer) = TIMER.with_borrow(|v| v.clone()) {
100-
return Ok(timer);
101-
}
102-
10394
let id = TIMER_COUNTER.fetch_add(1, Ordering::SeqCst);
10495
let mut tid = TimerId(std::ptr::null_mut());
10596
let mut sigev: libc::sigevent = unsafe { std::mem::zeroed() };
@@ -135,10 +126,6 @@ impl CPUTimer {
135126
.send(SignalMsg::Add((id, this.clone())))
136127
.unwrap();
137128

138-
TIMER.with_borrow_mut(|v| {
139-
assert!(v.replace(this.clone()).is_none());
140-
});
141-
142129
Ok(this)
143130
}
144131

@@ -159,13 +146,9 @@ impl CPUTimer {
159146
pub fn reset(&self, initial_expiry: u64, interval: u64) -> Result<(), Error> {
160147
use anyhow::Context;
161148
use linux::*;
162-
use log::error;
163149

164-
error!("try get lock: {:?}", std::thread::current());
165150
let timer = self.timer.try_lock().context("failed to get the lock")?;
166151

167-
error!("get lock: {:?}", std::thread::current());
168-
169152
let initial_expiry_secs = initial_expiry / 1000;
170153
let initial_expiry_msecs = initial_expiry % 1000;
171154
let interval_secs = interval / 1000;
@@ -182,12 +165,9 @@ impl CPUTimer {
182165
libc::timer_settime(timer.tid.0, 0, &tmspec, std::ptr::null_mut())
183166
} < 0
184167
{
185-
error!("leave lock: {:?}", std::thread::current());
186168
bail!(std::io::Error::last_os_error())
187169
}
188170

189-
error!("leave lock: {:?}", std::thread::current());
190-
191171
Ok(())
192172
}
193173

@@ -264,22 +244,22 @@ fn register_sigalrm() {
264244
Some(msg) = sig_msg_rx.recv() => {
265245
match msg {
266246
SignalMsg::Alarm(ref timer_id) => {
267-
if let Some(cpu_timer) = registry.get(timer_id) {
268-
if let Some(tx) = (*cpu_timer.cpu_alarm_val.cpu_alarms_tx.lock().await).clone() {
269-
if tx.send(()).is_err() {
270-
debug!("failed to send cpu alarm to the provided channel");
271-
}
272-
}
273-
} else {
274-
// NOTE: Unix signals are being delivered asynchronously,
275-
// and there are no guarantees to cancel the signal after
276-
// a timer has been deleted, and after a signal is
277-
// received, there may no longer be a target to accept it.
278-
error!(
279-
"can't find the cpu alarm signal matched with the received timer id: {}",
280-
*timer_id
281-
);
247+
if let Some(cpu_timer) = registry.get(timer_id) {
248+
if let Some(tx) = (*cpu_timer.cpu_alarm_val.cpu_alarms_tx.lock().await).clone() {
249+
if tx.send(()).is_err() {
250+
debug!("failed to send cpu alarm to the provided channel");
251+
}
282252
}
253+
} else {
254+
// NOTE: Unix signals are being delivered asynchronously,
255+
// and there are no guarantees to cancel the signal after
256+
// a timer has been deleted, and after a signal is
257+
// received, there may no longer be a target to accept it.
258+
error!(
259+
"can't find the cpu alarm signal matched with the received timer id: {}",
260+
*timer_id
261+
);
262+
}
283263
}
284264

285265
SignalMsg::Add((timer_id, cpu_timer)) => {

0 commit comments

Comments
 (0)