Skip to content

Commit 0bcc2b8

Browse files
kchibisovvberger
authored andcommitted
Use timerfd on linux to drive Timer
Previously Timer was using thread for all platforms, however platforms tend to provide efficient API to create timers. This commit makes `Timer` use timerfd system interface on Linux.
1 parent e082cc1 commit 0bcc2b8

File tree

4 files changed

+301
-112
lines changed

4 files changed

+301
-112
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
associated `Error` type on the `EventSource` trait.
1515
- **Breaking:** Many API functions now use Calloop's own error type (`calloop::Error`) instead of
1616
`std::io::Error` as the error variants of their returned results.
17+
- On Linux `Timer<T>` is now driven by `timerfd`.
1718

1819
## 0.9.2 -- 2021-12-27
1920

src/sources/timer.rs renamed to src/sources/timer/mod.rs

Lines changed: 43 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,39 @@
99
1010
use std::cell::RefCell;
1111
use std::collections::BinaryHeap;
12-
use std::sync::{
13-
atomic::{AtomicBool, Ordering},
14-
Arc, Mutex,
15-
};
12+
use std::sync::{Arc, Mutex};
1613
use std::time::{Duration, Instant};
1714

18-
use super::ping::{make_ping, PingError, PingSource};
1915
use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};
2016

17+
#[cfg(target_os = "linux")]
18+
mod timerfd;
19+
#[cfg(target_os = "linux")]
20+
use timerfd::{TimerScheduler, TimerSource};
21+
22+
#[cfg(any(
23+
target_os = "dragonfly",
24+
target_os = "freebsd",
25+
target_os = "netbsd",
26+
target_os = "openbsd",
27+
target_os = "macos"
28+
))]
29+
mod threaded;
30+
31+
#[cfg(any(
32+
target_os = "dragonfly",
33+
target_os = "freebsd",
34+
target_os = "netbsd",
35+
target_os = "openbsd",
36+
target_os = "macos"
37+
))]
38+
use threaded::{TimerScheduler, TimerSource};
39+
40+
/// An error arising from processing events for a timer.
41+
#[derive(thiserror::Error, Debug)]
42+
#[error(transparent)]
43+
pub struct TimerError(Box<dyn std::error::Error + Sync + Send>);
44+
2145
/// A Timer event source
2246
///
2347
/// It generates events of type `(T, TimerHandle<T>)`, providing you
@@ -122,23 +146,21 @@ impl<T> EventSource for Timer<T> {
122146
inner: self.inner.clone(),
123147
};
124148
let inner = &self.inner;
125-
self.source
126-
.process_events(readiness, token, |(), &mut ()| {
127-
loop {
128-
let next_expired: Option<T> = {
129-
let mut guard = inner.lock().unwrap();
130-
guard.next_expired()
131-
};
132-
if let Some(val) = next_expired {
133-
callback(val, &mut handle);
134-
} else {
135-
break;
136-
}
149+
self.source.process_events(readiness, token, |(), &mut ()| {
150+
loop {
151+
let next_expired: Option<T> = {
152+
let mut guard = inner.lock().unwrap();
153+
guard.next_expired()
154+
};
155+
if let Some(val) = next_expired {
156+
callback(val, &mut handle);
157+
} else {
158+
break;
137159
}
138-
// now compute the next timeout and signal if necessary
139-
inner.lock().unwrap().reschedule();
140-
})
141-
.map_err(TimerError)
160+
}
161+
// now compute the next timeout and signal if necessary
162+
inner.lock().unwrap().reschedule();
163+
})
142164
}
143165

144166
fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
@@ -272,97 +294,6 @@ impl<T> std::cmp::PartialEq for TimeoutData<T> {
272294

273295
impl<T> std::cmp::Eq for TimeoutData<T> {}
274296

275-
/*
276-
* Scheduling
277-
*/
278-
279-
#[derive(Debug)]
280-
struct TimerScheduler {
281-
current_deadline: Arc<Mutex<Option<Instant>>>,
282-
kill_switch: Arc<AtomicBool>,
283-
thread: Option<std::thread::JoinHandle<()>>,
284-
}
285-
286-
type TimerSource = PingSource;
287-
288-
impl TimerScheduler {
289-
fn new() -> crate::Result<(TimerScheduler, TimerSource)> {
290-
let current_deadline = Arc::new(Mutex::new(None::<Instant>));
291-
let thread_deadline = current_deadline.clone();
292-
293-
let kill_switch = Arc::new(AtomicBool::new(false));
294-
let thread_kill = kill_switch.clone();
295-
296-
let (ping, ping_source) = make_ping()?;
297-
298-
let thread = std::thread::Builder::new()
299-
.name("calloop timer".into())
300-
.spawn(move || loop {
301-
// stop if requested
302-
if thread_kill.load(Ordering::Acquire) {
303-
return;
304-
}
305-
// otherwise check the timeout
306-
let opt_deadline: Option<Instant> = {
307-
// subscope to ensure the mutex does not remain locked while the thread is parked
308-
let guard = thread_deadline.lock().unwrap();
309-
*guard
310-
};
311-
if let Some(deadline) = opt_deadline {
312-
if let Some(remaining) = deadline.checked_duration_since(Instant::now()) {
313-
// it is not yet expired, go to sleep until it
314-
std::thread::park_timeout(remaining);
315-
} else {
316-
// it is expired, wake the event loop and go to sleep
317-
ping.ping();
318-
std::thread::park();
319-
}
320-
} else {
321-
// there is none, got to sleep
322-
std::thread::park();
323-
}
324-
})?;
325-
326-
let scheduler = TimerScheduler {
327-
current_deadline,
328-
kill_switch,
329-
thread: Some(thread),
330-
};
331-
Ok((scheduler, ping_source))
332-
}
333-
334-
fn reschedule(&mut self, new_deadline: Instant) {
335-
let mut deadline_guard = self.current_deadline.lock().unwrap();
336-
if let Some(current_deadline) = *deadline_guard {
337-
if new_deadline < current_deadline || current_deadline <= Instant::now() {
338-
*deadline_guard = Some(new_deadline);
339-
self.thread.as_ref().unwrap().thread().unpark();
340-
}
341-
} else {
342-
*deadline_guard = Some(new_deadline);
343-
self.thread.as_ref().unwrap().thread().unpark();
344-
}
345-
}
346-
347-
fn deschedule(&mut self) {
348-
*(self.current_deadline.lock().unwrap()) = None;
349-
}
350-
}
351-
352-
impl Drop for TimerScheduler {
353-
fn drop(&mut self) {
354-
self.kill_switch.store(true, Ordering::Release);
355-
let thread = self.thread.take().unwrap();
356-
thread.thread().unpark();
357-
let _ = thread.join();
358-
}
359-
}
360-
361-
/// An error arising from processing events for a timer.
362-
#[derive(thiserror::Error, Debug)]
363-
#[error(transparent)]
364-
pub struct TimerError(PingError);
365-
366297
/*
367298
* Tests
368299
*/

src/sources/timer/threaded.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
//! Timer scheduler which is using thread to schedule timers.
2+
3+
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::sync::{Arc, Mutex};
5+
use std::time::Instant;
6+
7+
use crate::ping::{make_ping, Ping, PingSource};
8+
use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};
9+
10+
use super::TimerError;
11+
12+
#[derive(Debug)]
13+
pub struct TimerSource {
14+
source: PingSource,
15+
}
16+
17+
impl TimerSource {
18+
fn new() -> std::io::Result<(Ping, Self)> {
19+
let (ping, source) = make_ping()?;
20+
Ok((ping, Self { source }))
21+
}
22+
}
23+
24+
impl EventSource for TimerSource {
25+
type Event = ();
26+
type Metadata = ();
27+
type Ret = ();
28+
type Error = TimerError;
29+
30+
fn process_events<C>(
31+
&mut self,
32+
readiness: Readiness,
33+
token: Token,
34+
mut callback: C,
35+
) -> Result<PostAction, Self::Error>
36+
where
37+
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
38+
{
39+
self.source
40+
.process_events(readiness, token, |_, &mut _| {
41+
callback((), &mut ());
42+
})
43+
.map_err(|err| TimerError(err.into()))
44+
}
45+
46+
fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
47+
self.source.register(poll, token_factory)
48+
}
49+
50+
fn reregister(
51+
&mut self,
52+
poll: &mut Poll,
53+
token_factory: &mut TokenFactory,
54+
) -> crate::Result<()> {
55+
self.source.reregister(poll, token_factory)
56+
}
57+
58+
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
59+
self.source.unregister(poll)
60+
}
61+
}
62+
63+
#[derive(Debug)]
64+
pub struct TimerScheduler {
65+
current_deadline: Arc<Mutex<Option<Instant>>>,
66+
kill_switch: Arc<AtomicBool>,
67+
thread: Option<std::thread::JoinHandle<()>>,
68+
}
69+
70+
impl TimerScheduler {
71+
pub fn new() -> crate::Result<(TimerScheduler, TimerSource)> {
72+
let current_deadline = Arc::new(Mutex::new(None::<Instant>));
73+
let thread_deadline = current_deadline.clone();
74+
75+
let kill_switch = Arc::new(AtomicBool::new(false));
76+
let thread_kill = kill_switch.clone();
77+
78+
let (ping, source) = TimerSource::new()?;
79+
80+
let thread = std::thread::Builder::new()
81+
.name("calloop timer".into())
82+
.spawn(move || loop {
83+
// stop if requested
84+
if thread_kill.load(Ordering::Acquire) {
85+
return;
86+
}
87+
// otherwise check the timeout
88+
let opt_deadline: Option<Instant> = {
89+
// subscope to ensure the mutex does not remain locked while the thread is parked
90+
let guard = thread_deadline.lock().unwrap();
91+
*guard
92+
};
93+
if let Some(deadline) = opt_deadline {
94+
if let Some(remaining) = deadline.checked_duration_since(Instant::now()) {
95+
// it is not yet expired, go to sleep until it
96+
std::thread::park_timeout(remaining);
97+
} else {
98+
// it is expired, wake the event loop and go to sleep
99+
ping.ping();
100+
std::thread::park();
101+
}
102+
} else {
103+
// there is none, got to sleep
104+
std::thread::park();
105+
}
106+
})?;
107+
108+
let scheduler = TimerScheduler {
109+
current_deadline,
110+
kill_switch,
111+
thread: Some(thread),
112+
};
113+
Ok((scheduler, source))
114+
}
115+
116+
pub fn reschedule(&mut self, new_deadline: Instant) {
117+
let mut deadline_guard = self.current_deadline.lock().unwrap();
118+
if let Some(current_deadline) = *deadline_guard {
119+
if new_deadline < current_deadline || current_deadline <= Instant::now() {
120+
*deadline_guard = Some(new_deadline);
121+
self.thread.as_ref().unwrap().thread().unpark();
122+
}
123+
} else {
124+
*deadline_guard = Some(new_deadline);
125+
self.thread.as_ref().unwrap().thread().unpark();
126+
}
127+
}
128+
129+
pub fn deschedule(&mut self) {
130+
*(self.current_deadline.lock().unwrap()) = None;
131+
}
132+
}
133+
134+
impl Drop for TimerScheduler {
135+
fn drop(&mut self) {
136+
self.kill_switch.store(true, Ordering::Release);
137+
let thread = self.thread.take().unwrap();
138+
thread.thread().unpark();
139+
let _ = thread.join();
140+
}
141+
}

0 commit comments

Comments
 (0)