Skip to content

Commit ea3595a

Browse files
committed
Unify BitfieldEvent and BitfieldSubscriptionEvent
1 parent b55267b commit ea3595a

File tree

3 files changed

+29
-58
lines changed

3 files changed

+29
-58
lines changed

examples/multiprovider.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use console::Term;
66
use iroh::{NodeId, SecretKey};
77
use iroh_blobs::{
88
downloader2::{
9-
print_bitmap, DownloadRequest, Downloader, ObserveEvent, ObserveRequest,
9+
print_bitmap, BitfieldEvent, DownloadRequest, Downloader, ObserveRequest,
1010
StaticContentDiscovery,
1111
},
1212
store::Store,
@@ -115,12 +115,12 @@ impl BlobDownloadProgress {
115115
}
116116
}
117117

118-
fn update(&mut self, ev: ObserveEvent) {
118+
fn update(&mut self, ev: BitfieldEvent) {
119119
match ev {
120-
ObserveEvent::Bitfield { ranges } => {
120+
BitfieldEvent::State { ranges } => {
121121
self.current = ranges;
122122
}
123-
ObserveEvent::BitfieldUpdate { added, removed } => {
123+
BitfieldEvent::Update { added, removed } => {
124124
self.current |= added;
125125
self.current -= removed;
126126
}

src/downloader2.rs

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -82,43 +82,22 @@ struct BitfieldSubscriptionId(u64);
8282
/// A pluggable bitfield subscription mechanism
8383
pub trait BitfieldSubscription: std::fmt::Debug + Send + 'static {
8484
/// Subscribe to a bitfield
85-
fn subscribe(
86-
&mut self,
87-
peer: BitfieldPeer,
88-
hash: Hash,
89-
) -> BoxStream<'static, BitfieldSubscriptionEvent>;
85+
fn subscribe(&mut self, peer: BitfieldPeer, hash: Hash) -> BoxStream<'static, BitfieldEvent>;
9086
}
9187

9288
/// A boxed bitfield subscription
9389
pub type BoxedBitfieldSubscription = Box<dyn BitfieldSubscription>;
9490

95-
/// An event from a bitfield subscription
96-
#[derive(Debug)]
97-
pub enum BitfieldSubscriptionEvent {
98-
/// Set the bitfield to the given ranges
99-
Bitfield {
100-
/// The entire bitfield
101-
ranges: ChunkRanges,
102-
},
103-
/// Update the bitfield with the given ranges
104-
BitfieldUpdate {
105-
/// The ranges that were added
106-
added: ChunkRanges,
107-
/// The ranges that were removed
108-
removed: ChunkRanges,
109-
},
110-
}
111-
11291
/// Events from observing a local bitfield
11392
#[derive(Debug)]
114-
pub enum ObserveEvent {
115-
/// Set the bitfield to the given ranges
116-
Bitfield {
93+
pub enum BitfieldEvent {
94+
/// The full state of the bitfield
95+
State {
11796
/// The entire bitfield
11897
ranges: ChunkRanges,
11998
},
120-
/// Update the bitfield with the given ranges
121-
BitfieldUpdate {
99+
/// An update to the bitfield
100+
Update {
122101
/// The ranges that were added
123102
added: ChunkRanges,
124103
/// The ranges that were removed
@@ -281,7 +260,7 @@ impl Downloader {
281260
pub async fn observe(
282261
&self,
283262
request: ObserveRequest,
284-
) -> anyhow::Result<tokio::sync::mpsc::Receiver<ObserveEvent>> {
263+
) -> anyhow::Result<tokio::sync::mpsc::Receiver<BitfieldEvent>> {
285264
let (send, recv) = tokio::sync::mpsc::channel(request.buffer);
286265
self.send
287266
.send(UserCommand::Observe { request, send })
@@ -332,19 +311,15 @@ impl Downloader {
332311
struct TestBitfieldSubscription;
333312

334313
impl BitfieldSubscription for TestBitfieldSubscription {
335-
fn subscribe(
336-
&mut self,
337-
peer: BitfieldPeer,
338-
_hash: Hash,
339-
) -> BoxStream<'static, BitfieldSubscriptionEvent> {
314+
fn subscribe(&mut self, peer: BitfieldPeer, _hash: Hash) -> BoxStream<'static, BitfieldEvent> {
340315
let ranges = match peer {
341316
BitfieldPeer::Local => ChunkRanges::empty(),
342317
BitfieldPeer::Remote(_) => {
343318
ChunkRanges::from(ChunkNum(0)..ChunkNum(1024 * 1024 * 1024 * 1024))
344319
}
345320
};
346321
Box::pin(
347-
futures_lite::stream::once(BitfieldSubscriptionEvent::Bitfield { ranges })
322+
futures_lite::stream::once(BitfieldEvent::State { ranges })
348323
.chain(futures_lite::stream::pending()),
349324
)
350325
}
@@ -389,11 +364,7 @@ async fn get_valid_ranges_remote(
389364
}
390365

391366
impl<S: Store> BitfieldSubscription for SimpleBitfieldSubscription<S> {
392-
fn subscribe(
393-
&mut self,
394-
peer: BitfieldPeer,
395-
hash: Hash,
396-
) -> BoxStream<'static, BitfieldSubscriptionEvent> {
367+
fn subscribe(&mut self, peer: BitfieldPeer, hash: Hash) -> BoxStream<'static, BitfieldEvent> {
397368
let (send, recv) = tokio::sync::oneshot::channel();
398369
match peer {
399370
BitfieldPeer::Local => {
@@ -429,7 +400,7 @@ impl<S: Store> BitfieldSubscription for SimpleBitfieldSubscription<S> {
429400
Ok(ev) => ev,
430401
Err(_) => ChunkRanges::empty(),
431402
};
432-
BitfieldSubscriptionEvent::Bitfield { ranges }
403+
BitfieldEvent::State { ranges }
433404
}
434405
.into_stream(),
435406
)
@@ -549,7 +520,9 @@ mod tests {
549520
#[tokio::test]
550521
async fn test_valid_ranges() -> TestResult<()> {
551522
let store = crate::store::mem::Store::new();
552-
let tt = store.import_bytes(vec![0u8;1025].into(), crate::BlobFormat::Raw).await?;
523+
let tt = store
524+
.import_bytes(vec![0u8; 1025].into(), crate::BlobFormat::Raw)
525+
.await?;
553526
let entry = store.get_mut(tt.hash()).await?.unwrap();
554527
let valid = crate::get::db::valid_ranges::<crate::store::mem::Store>(&entry).await?;
555528
assert!(valid == ChunkRanges::from(ChunkNum(0)..ChunkNum(2)));

src/downloader2/actor.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub(super) enum UserCommand {
1010
},
1111
Observe {
1212
request: ObserveRequest,
13-
send: tokio::sync::mpsc::Sender<ObserveEvent>,
13+
send: tokio::sync::mpsc::Sender<BitfieldEvent>,
1414
},
1515
}
1616

@@ -38,7 +38,7 @@ pub(super) struct DownloaderActor<S> {
3838
/// Id generator for observe ids
3939
observe_id_gen: IdGenerator<ObserveId>,
4040
/// Observers
41-
observers: BTreeMap<ObserveId, tokio::sync::mpsc::Sender<ObserveEvent>>,
41+
observers: BTreeMap<ObserveId, tokio::sync::mpsc::Sender<BitfieldEvent>>,
4242
/// The time when the actor was started, serves as the epoch for time messages to the state machine
4343
start: Instant,
4444
}
@@ -144,17 +144,15 @@ impl<S: Store> DownloaderActor<S> {
144144
let task = spawn(async move {
145145
while let Some(ev) = stream.next().await {
146146
let cmd = match ev {
147-
BitfieldSubscriptionEvent::Bitfield { ranges } => {
147+
BitfieldEvent::State { ranges } => {
148148
Command::Bitfield { peer, hash, ranges }
149149
}
150-
BitfieldSubscriptionEvent::BitfieldUpdate { added, removed } => {
151-
Command::BitfieldUpdate {
152-
peer,
153-
hash,
154-
added,
155-
removed,
156-
}
157-
}
150+
BitfieldEvent::Update { added, removed } => Command::BitfieldUpdate {
151+
peer,
152+
hash,
153+
added,
154+
removed,
155+
},
158156
};
159157
send.send(cmd).await.ok();
160158
}
@@ -210,7 +208,7 @@ impl<S: Store> DownloaderActor<S> {
210208
let Some(sender) = self.observers.get(&id) else {
211209
return;
212210
};
213-
if sender.try_send(ObserveEvent::Bitfield { ranges }).is_err() {
211+
if sender.try_send(BitfieldEvent::State { ranges }).is_err() {
214212
// the observer has been dropped
215213
self.observers.remove(&id);
216214
}
@@ -220,7 +218,7 @@ impl<S: Store> DownloaderActor<S> {
220218
return;
221219
};
222220
if sender
223-
.try_send(ObserveEvent::BitfieldUpdate { added, removed })
221+
.try_send(BitfieldEvent::Update { added, removed })
224222
.is_err()
225223
{
226224
// the observer has been dropped

0 commit comments

Comments
 (0)