Skip to content

Commit 71f0766

Browse files
committed
Use single threaded version of tokio::sync::watch
Since we are guaranteed to be single threaded within the handler fns for a single hash, we can use a much more lightweight watcher impl. Also, tokio::sync::watch is optimized for the case where there are a lot of parallel watchers, using a giant BigNotify internally. We want to optimize for the case where there are just 0 or a few watchers, so we are not that much concerned with the thundering herd problem.
1 parent ba0ab3f commit 71f0766

File tree

4 files changed

+89
-53
lines changed

4 files changed

+89
-53
lines changed

src/store/fs/bao_file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use bao_tree::{
2020
use bytes::{Bytes, BytesMut};
2121
use derive_more::Debug;
2222
use irpc::channel::mpsc;
23-
use tokio::sync::watch;
23+
use super::util::watch;
2424
use tracing::{debug, error, info, trace};
2525

2626
use super::{

src/store/fs/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::future::Future;
22

33
use tokio::{select, sync::mpsc};
44
pub(crate) mod entity_manager;
5-
pub(crate) mod watcher;
5+
pub(crate) mod watch;
66

77
/// A wrapper for a tokio mpsc receiver that allows peeking at the next message.
88
#[derive(Debug)]

src/store/fs/util/watch.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use std::{ops::Deref, sync::Arc};
2+
3+
use atomic_refcell::{AtomicRef, AtomicRefCell};
4+
5+
struct State<T> {
6+
value: T,
7+
dropped: bool,
8+
}
9+
10+
struct Shared<T> {
11+
value: AtomicRefCell<State<T>>,
12+
notify: tokio::sync::Notify,
13+
}
14+
15+
pub struct Sender<T>(Arc<Shared<T>>);
16+
17+
pub struct Receiver<T>(Arc<Shared<T>>);
18+
19+
impl<T> Sender<T> {
20+
pub fn new(value: T) -> Self {
21+
Self(Arc::new(Shared {
22+
value: AtomicRefCell::new(State {
23+
value,
24+
dropped: false,
25+
}),
26+
notify: tokio::sync::Notify::new(),
27+
}))
28+
}
29+
30+
pub fn send_if_modified<F>(&self, modify: F) -> bool
31+
where
32+
F: FnOnce(&mut T) -> bool,
33+
{
34+
let mut state = self.0.value.borrow_mut();
35+
let modified = modify(&mut state.value);
36+
if modified {
37+
self.0.notify.notify_waiters();
38+
}
39+
modified
40+
}
41+
42+
pub fn borrow(&self) -> impl Deref<Target = T> + '_ {
43+
AtomicRef::map(self.0.value.borrow(), |state| &state.value)
44+
}
45+
46+
pub fn subscribe(&self) -> Receiver<T> {
47+
Receiver(self.0.clone())
48+
}
49+
}
50+
51+
impl<T> Drop for Sender<T> {
52+
fn drop(&mut self) {
53+
self.0.value.borrow_mut().dropped = true;
54+
self.0.notify.notify_waiters();
55+
}
56+
}
57+
58+
impl<T> Receiver<T> {
59+
pub async fn changed(&self) -> Result<(), error::RecvError> {
60+
self.0.notify.notified().await;
61+
if self.0.value.borrow().dropped {
62+
Err(error::RecvError(()))
63+
} else {
64+
Ok(())
65+
}
66+
}
67+
68+
pub fn borrow(&self) -> impl Deref<Target = T> + '_ {
69+
AtomicRef::map(self.0.value.borrow(), |state| &state.value)
70+
}
71+
}
72+
73+
pub mod error {
74+
use std::{error::Error, fmt};
75+
76+
/// Error produced when receiving a change notification.
77+
#[derive(Debug, Clone)]
78+
pub struct RecvError(pub(super) ());
79+
80+
impl fmt::Display for RecvError {
81+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
82+
write!(fmt, "channel closed")
83+
}
84+
}
85+
86+
impl Error for RecvError {}
87+
}

src/store/fs/util/watcher.rs

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)