Skip to content

Commit 63ad673

Browse files
authored
Merge pull request #64 from ava-labs/timer
vm/utils: add timer
2 parents 1706e27 + 5bbaf03 commit 63ad673

File tree

3 files changed

+153
-0
lines changed

3 files changed

+153
-0
lines changed

mini-kvvm/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ pub mod api;
22
pub mod block;
33
pub mod chain;
44
pub mod genesis;
5+
pub mod utils;
56
pub mod vm;

mini-kvvm/src/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod timer;

mini-kvvm/src/utils/timer.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use std::{
2+
sync::{
3+
atomic::{AtomicBool, Ordering},
4+
Arc,
5+
},
6+
time::Duration,
7+
};
8+
9+
use tokio::{
10+
sync::{broadcast, mpsc, RwLock},
11+
time::{sleep, Instant},
12+
};
13+
14+
#[derive(Clone)]
15+
/// ref. https://pkg.go.dev/github.com/ava-labs/avalanchego/utils/timer#Timer
16+
pub struct Timer {
17+
/// Optional handler function that will fire when should_execute is true.
18+
handler: Option<fn()>,
19+
20+
// Timeout broadcast channel is used to reset ticker threads.
21+
timeout_ch: broadcast::Sender<()>,
22+
23+
/// New timer creation stops when true.
24+
finished: Arc<AtomicBool>,
25+
26+
/// Notifies the timer to invoke the handler fn.
27+
should_execute: Arc<AtomicBool>,
28+
29+
/// Duration for timer tick event.
30+
duration: Arc<RwLock<Option<Duration>>>,
31+
}
32+
33+
impl Timer {
34+
pub fn new(handler: Option<fn()>) -> Self {
35+
let (timeout_ch, _): (broadcast::Sender<()>, broadcast::Receiver<()>) =
36+
broadcast::channel(1);
37+
Self {
38+
finished: Arc::new(AtomicBool::new(false)),
39+
should_execute: Arc::new(AtomicBool::new(false)),
40+
handler,
41+
timeout_ch,
42+
duration: Arc::new(RwLock::new(None)),
43+
}
44+
}
45+
46+
/// Defines the duration until the handler function will be executed.
47+
pub async fn set_handler_duration(&self, duration: Duration) {
48+
let mut timer = self.duration.write().await;
49+
*timer = Some(duration);
50+
self.should_execute.store(true, Ordering::Relaxed);
51+
self.reset().await;
52+
}
53+
54+
/// Cancel the currently scheduled event.
55+
pub async fn cancel(&self) {
56+
self.should_execute.store(false, Ordering::Relaxed);
57+
self.reset().await;
58+
}
59+
60+
/// Stops execution of this timer.
61+
pub async fn stop(&self) {
62+
self.finished.store(true, Ordering::Relaxed);
63+
self.reset().await;
64+
}
65+
66+
/// Manages a Timer lifecycle.
67+
pub async fn dispatch(&mut self) {
68+
let (ticker_tx, mut ticker_rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel(1);
69+
let cleared = Arc::new(AtomicBool::new(false));
70+
let reset = Arc::new(AtomicBool::new(false));
71+
72+
// default duration is 0 so that we block until duration is set.
73+
let mut duration = Some(Duration::from_secs(0));
74+
while !self.finished.load(Ordering::Relaxed) {
75+
if cleared.load(Ordering::Relaxed) && self.should_execute.load(Ordering::Relaxed) {
76+
if let Some(handler) = &self.handler {
77+
cleared.store(false, Ordering::Relaxed);
78+
(handler)();
79+
}
80+
}
81+
82+
// start a new ticker thread which sends a single tick signal.
83+
if reset.load(Ordering::Relaxed) {
84+
let ticker = ticker_tx.clone();
85+
tokio::spawn(async move {
86+
let time = Instant::now();
87+
if let Some(duration) = duration {
88+
sleep(duration).await;
89+
};
90+
let _ = ticker.send(()).await;
91+
log::debug!("Tick duration: {:?}", time.elapsed());
92+
});
93+
}
94+
95+
reset.store(false, Ordering::Relaxed);
96+
cleared.store(false, Ordering::Relaxed);
97+
98+
let mut timeout_ch = self.timeout_ch.subscribe();
99+
loop {
100+
// select will block until one of the channels is received
101+
tokio::select! {
102+
Some(_) = ticker_rx.recv() => {
103+
cleared.store(true, Ordering::Relaxed);
104+
log::debug!("tick\n");
105+
break;
106+
},
107+
resp = timeout_ch.recv() => match resp {
108+
Ok(_) => {
109+
// reset timer duration
110+
if self.should_execute.load(Ordering::Relaxed) {
111+
let guard = self.duration.read().await;
112+
duration = guard.to_owned();
113+
drop(guard);
114+
}
115+
reset.store(true, Ordering::Relaxed);
116+
log::debug!("timeout\n");
117+
break;
118+
},
119+
Err(e) => {
120+
log::error!("timeout channel failed: {}", e.to_string());
121+
break
122+
},
123+
}
124+
}
125+
}
126+
}
127+
}
128+
129+
/// Calls the timeout channel which will result in a new timer event.
130+
pub async fn reset(&self) {
131+
let _ = self.timeout_ch.send(());
132+
}
133+
}
134+
135+
#[tokio::test]
136+
async fn timer_test() {
137+
fn echo() {
138+
println!("echo!!")
139+
}
140+
let timer = Timer::new(Some(echo));
141+
let mut timer_clone = timer.clone();
142+
tokio::spawn(async move {
143+
// echo will fire after 10ms
144+
timer.set_handler_duration(Duration::from_millis(10)).await;
145+
sleep(Duration::from_millis(15)).await;
146+
// unblock dispatch
147+
timer.stop().await;
148+
});
149+
150+
timer_clone.dispatch().await;
151+
}

0 commit comments

Comments
 (0)