Skip to content

Commit ba0ab3f

Browse files
committed
Add version of tokio::sync::Watcher that only works locally and copy the relevant API
1 parent fd9faab commit ba0ab3f

File tree

4 files changed

+65
-6
lines changed

4 files changed

+65
-6
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ iroh-base = "0.90"
4444
reflink-copy = "0.1.24"
4545
irpc = { version = "0.5.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream", "derive"], default-features = false }
4646
iroh-metrics = { version = "0.35" }
47+
atomic_refcell = "0.1.13"
4748

4849
[dev-dependencies]
4950
clap = { version = "4.5.31", features = ["derive"] }
@@ -58,7 +59,6 @@ testresult = "0.4.1"
5859
tracing-subscriber = { version = "0.3.19", features = ["fmt"] }
5960
tracing-test = "0.2.5"
6061
walkdir = "2.5.0"
61-
atomic_refcell = "0.1.13"
6262

6363
[features]
6464
hide-proto-docs = []

src/store/fs.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ use crate::{
106106
ApiClient,
107107
},
108108
store::{
109-
fs::util::entity_manager::{self, ActiveEntityState},
109+
fs::util::entity_manager::{self, ActiveEntityState, ShutdownCause},
110110
util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
111111
Hash,
112112
},
@@ -217,10 +217,17 @@ impl entity_manager::Params for EmParams {
217217

218218
type EntityState = Slot;
219219

220-
async fn on_shutdown(
221-
_state: entity_manager::ActiveEntityState<Self>,
222-
_cause: entity_manager::ShutdownCause,
223-
) {
220+
async fn on_shutdown(state: HashContext, cause: ShutdownCause) {
221+
// this isn't strictly necessary. Drop will run anyway as soon as the
222+
// state is reset to it's default value. Doing it here means that we
223+
// have exact control over where it happens.
224+
if let Some(handle) = state.state.0.lock().await.take() {
225+
trace!(
226+
"shutting down entity manager for hash: {}, cause: {cause:?}",
227+
state.id
228+
);
229+
drop(handle);
230+
}
224231
}
225232
}
226233

src/store/fs/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::future::Future;
22

33
use tokio::{select, sync::mpsc};
44
pub(crate) mod entity_manager;
5+
pub(crate) mod watcher;
56

67
/// A wrapper for a tokio mpsc receiver that allows peeking at the next message.
78
#[derive(Debug)]

src/store/fs/util/watcher.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use std::{ops::Deref, sync::Arc};
2+
3+
use atomic_refcell::AtomicRefCell;
4+
5+
struct Shared<T> {
6+
value: AtomicRefCell<T>,
7+
notify: tokio::sync::Notify,
8+
}
9+
10+
pub struct Sender<T>(Arc<Shared<T>>);
11+
12+
pub struct Receiver<T>(Arc<Shared<T>>);
13+
14+
impl<T> Sender<T> {
15+
pub fn new(value: T) -> Self {
16+
Self(Arc::new(Shared {
17+
value: AtomicRefCell::new(value),
18+
notify: tokio::sync::Notify::new(),
19+
}))
20+
}
21+
22+
pub fn send_if_modified<F>(&self, modify: F) -> bool
23+
where
24+
F: FnOnce(&mut T) -> bool,
25+
{
26+
let mut value = self.0.value.borrow_mut();
27+
let modified = modify(&mut value);
28+
if modified {
29+
self.0.notify.notify_waiters();
30+
}
31+
modified
32+
}
33+
34+
pub fn borrow(&self) -> impl Deref<Target = T> + '_ {
35+
self.0.value.borrow()
36+
}
37+
38+
pub fn subscribe(&self) -> Receiver<T> {
39+
Receiver(self.0.clone())
40+
}
41+
}
42+
43+
impl<T> Receiver<T> {
44+
pub async fn changed(&self) {
45+
self.0.notify.notified().await;
46+
}
47+
48+
pub fn borrow(&self) -> impl Deref<Target = T> + '_ {
49+
self.0.value.borrow()
50+
}
51+
}

0 commit comments

Comments
 (0)