Skip to content

Commit 704c7eb

Browse files
committed
Unify BitfieldEvents and start adding size (not used yet)
1 parent ea3595a commit 704c7eb

File tree

4 files changed

+119
-103
lines changed

4 files changed

+119
-103
lines changed

examples/multiprovider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl BlobDownloadProgress {
117117

118118
fn update(&mut self, ev: BitfieldEvent) {
119119
match ev {
120-
BitfieldEvent::State { ranges } => {
120+
BitfieldEvent::State { ranges, .. } => {
121121
self.current = ranges;
122122
}
123123
BitfieldEvent::Update { added, removed } => {

src/downloader2.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::{
1717
io,
1818
marker::PhantomData,
1919
sync::Arc,
20-
time::Instant,
20+
time::Instant, u64,
2121
};
2222

2323
use crate::{
@@ -89,12 +89,14 @@ pub trait BitfieldSubscription: std::fmt::Debug + Send + 'static {
8989
pub type BoxedBitfieldSubscription = Box<dyn BitfieldSubscription>;
9090

9191
/// Events from observing a local bitfield
92-
#[derive(Debug)]
92+
#[derive(Debug, PartialEq, Eq)]
9393
pub enum BitfieldEvent {
9494
/// The full state of the bitfield
9595
State {
9696
/// The entire bitfield
9797
ranges: ChunkRanges,
98+
/// The most precise known size of the blob
99+
size: u64,
98100
},
99101
/// An update to the bitfield
100102
Update {
@@ -319,7 +321,7 @@ impl BitfieldSubscription for TestBitfieldSubscription {
319321
}
320322
};
321323
Box::pin(
322-
futures_lite::stream::once(BitfieldEvent::State { ranges })
324+
futures_lite::stream::once(BitfieldEvent::State { ranges, size: 1024 * 1024 * 1024 * 1024 * 1024 })
323325
.chain(futures_lite::stream::pending()),
324326
)
325327
}
@@ -400,7 +402,7 @@ impl<S: Store> BitfieldSubscription for SimpleBitfieldSubscription<S> {
400402
Ok(ev) => ev,
401403
Err(_) => ChunkRanges::empty(),
402404
};
403-
BitfieldEvent::State { ranges }
405+
BitfieldEvent::State { ranges, size: u64::MAX }
404406
}
405407
.into_stream(),
406408
)

src/downloader2/actor.rs

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,8 @@ impl<S: Store> DownloaderActor<S> {
142142
let send = self.command_tx.clone();
143143
let mut stream = self.subscribe_bitfield.subscribe(peer, hash);
144144
let task = spawn(async move {
145-
while let Some(ev) = stream.next().await {
146-
let cmd = match ev {
147-
BitfieldEvent::State { ranges } => {
148-
Command::Bitfield { peer, hash, ranges }
149-
}
150-
BitfieldEvent::Update { added, removed } => Command::BitfieldUpdate {
151-
peer,
152-
hash,
153-
added,
154-
removed,
155-
},
156-
};
145+
while let Some(event) = stream.next().await {
146+
let cmd = Command::BitfieldInfo { peer, hash, event };
157147
send.send(cmd).await.ok();
158148
}
159149
});
@@ -204,23 +194,11 @@ impl<S: Store> DownloaderActor<S> {
204194
done.send(()).ok();
205195
}
206196
}
207-
Event::LocalBitfield { id, ranges } => {
208-
let Some(sender) = self.observers.get(&id) else {
209-
return;
210-
};
211-
if sender.try_send(BitfieldEvent::State { ranges }).is_err() {
212-
// the observer has been dropped
213-
self.observers.remove(&id);
214-
}
215-
}
216-
Event::LocalBitfieldUpdate { id, added, removed } => {
197+
Event::LocalBitfieldInfo { id, event } => {
217198
let Some(sender) = self.observers.get(&id) else {
218199
return;
219200
};
220-
if sender
221-
.try_send(BitfieldEvent::Update { added, removed })
222-
.is_err()
223-
{
201+
if sender.try_send(event).is_err() {
224202
// the observer has been dropped
225203
self.observers.remove(&id);
226204
}
@@ -304,11 +282,13 @@ async fn peer_download<S: Store>(
304282
batch.push(leaf.into());
305283
writer.write_batch(size, std::mem::take(&mut batch)).await?;
306284
sender
307-
.send(Command::BitfieldUpdate {
285+
.send(Command::BitfieldInfo {
308286
peer: BitfieldPeer::Local,
309287
hash,
310-
added,
311-
removed: ChunkRanges::empty(),
288+
event: BitfieldEvent::Update {
289+
added,
290+
removed: ChunkRanges::empty(),
291+
},
312292
})
313293
.await
314294
.ok();

0 commit comments

Comments
 (0)