Skip to content

Commit 5edec02

Browse files
committed
Move a few things from net_protocol that don't belong there.
Also document the remaining things in net_protocol
1 parent 58a999b commit 5edec02

File tree

7 files changed

+117
-106
lines changed

7 files changed

+117
-106
lines changed

src/cli.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::{
2323
progress::{BlobProgress, DownloadProgress},
2424
Stats,
2525
},
26-
net_protocol::DownloadMode,
2726
provider::AddProgress,
2827
rpc::client::blobs::{
2928
self, BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption,

src/net_protocol.rs

Lines changed: 68 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,22 @@
11
//! Adaptation of `iroh-blobs` as an `iroh` protocol.
2-
3-
// TODO: reduce API surface and add documentation
4-
#![allow(missing_docs)]
5-
2+
//!
3+
//! A blobs protocol handler wraps a store, so you must first create a store.
4+
//!
5+
//! The entry point to create a blobs protocol handler is [`Blobs::builder`].
66
use std::{collections::BTreeSet, fmt::Debug, ops::DerefMut, sync::Arc};
77

88
use anyhow::{bail, Result};
99
use futures_lite::future::Boxed as BoxedFuture;
1010
use futures_util::future::BoxFuture;
11-
use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint, NodeAddr};
12-
use serde::{Deserialize, Serialize};
11+
use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint};
1312
use tracing::debug;
1413

1514
use crate::{
1615
downloader::Downloader,
1716
provider::EventSender,
1817
store::GcConfig,
19-
util::{
20-
local_pool::{self, LocalPoolHandle},
21-
SetTagOption,
22-
},
23-
BlobFormat, Hash,
18+
util::local_pool::{self, LocalPoolHandle},
19+
Hash,
2420
};
2521

2622
/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
@@ -50,67 +46,77 @@ pub(crate) struct BlobsInner<S> {
5046
pub(crate) endpoint: Endpoint,
5147
gc_state: std::sync::Mutex<GcState>,
5248
#[cfg(feature = "rpc")]
53-
pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
49+
pub(crate) batches: tokio::sync::Mutex<batches::BlobBatches>,
5450
}
5551

52+
/// Blobs protocol handler.
5653
#[derive(Debug, Clone)]
5754
pub struct Blobs<S> {
5855
pub(crate) inner: Arc<BlobsInner<S>>,
5956
#[cfg(feature = "rpc")]
6057
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
6158
}
6259

63-
/// Keeps track of all the currently active batch operations of the blobs api.
64-
#[cfg(feature = "rpc")]
65-
#[derive(Debug, Default)]
66-
pub(crate) struct BlobBatches {
67-
/// Currently active batches
68-
batches: std::collections::BTreeMap<BatchId, BlobBatch>,
69-
/// Used to generate new batch ids.
70-
max: u64,
71-
}
60+
pub(crate) mod batches {
61+
use anyhow::Result;
62+
use serde::{Deserialize, Serialize};
7263

73-
/// A single batch of blob operations
74-
#[cfg(feature = "rpc")]
75-
#[derive(Debug, Default)]
76-
struct BlobBatch {
77-
/// The tags in this batch.
78-
tags: std::collections::BTreeMap<crate::HashAndFormat, Vec<crate::TempTag>>,
79-
}
64+
/// Newtype for a batch id
65+
#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
66+
pub struct BatchId(pub u64);
8067

81-
#[cfg(feature = "rpc")]
82-
impl BlobBatches {
83-
/// Create a new unique batch id.
84-
pub fn create(&mut self) -> BatchId {
85-
let id = self.max;
86-
self.max += 1;
87-
BatchId(id)
68+
/// Keeps track of all the currently active batch operations of the blobs api.
69+
#[cfg(feature = "rpc")]
70+
#[derive(Debug, Default)]
71+
pub(crate) struct BlobBatches {
72+
/// Currently active batches
73+
batches: std::collections::BTreeMap<BatchId, BlobBatch>,
74+
/// Used to generate new batch ids.
75+
max: u64,
8876
}
8977

90-
/// Store a temp tag in a batch identified by a batch id.
91-
pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) {
92-
let entry = self.batches.entry(batch).or_default();
93-
entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
78+
/// A single batch of blob operations
79+
#[cfg(feature = "rpc")]
80+
#[derive(Debug, Default)]
81+
struct BlobBatch {
82+
/// The tags in this batch.
83+
tags: std::collections::BTreeMap<crate::HashAndFormat, Vec<crate::TempTag>>,
9484
}
9585

96-
/// Remove a tag from a batch.
97-
pub fn remove_one(&mut self, batch: BatchId, content: &crate::HashAndFormat) -> Result<()> {
98-
if let Some(batch) = self.batches.get_mut(&batch) {
99-
if let Some(tags) = batch.tags.get_mut(content) {
100-
tags.pop();
101-
if tags.is_empty() {
102-
batch.tags.remove(content);
86+
#[cfg(feature = "rpc")]
87+
impl BlobBatches {
88+
/// Create a new unique batch id.
89+
pub fn create(&mut self) -> BatchId {
90+
let id = self.max;
91+
self.max += 1;
92+
BatchId(id)
93+
}
94+
95+
/// Store a temp tag in a batch identified by a batch id.
96+
pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) {
97+
let entry = self.batches.entry(batch).or_default();
98+
entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
99+
}
100+
101+
/// Remove a tag from a batch.
102+
pub fn remove_one(&mut self, batch: BatchId, content: &crate::HashAndFormat) -> Result<()> {
103+
if let Some(batch) = self.batches.get_mut(&batch) {
104+
if let Some(tags) = batch.tags.get_mut(content) {
105+
tags.pop();
106+
if tags.is_empty() {
107+
batch.tags.remove(content);
108+
}
109+
return Ok(());
103110
}
104-
return Ok(());
105111
}
112+
// this can happen if we try to upgrade a tag from an expired batch
113+
anyhow::bail!("tag not found in batch");
106114
}
107-
// this can happen if we try to upgrade a tag from an expired batch
108-
anyhow::bail!("tag not found in batch");
109-
}
110115

111-
/// Remove an entire batch.
112-
pub fn remove(&mut self, batch: BatchId) {
113-
self.batches.remove(&batch);
116+
/// Remove an entire batch.
117+
pub fn remove(&mut self, batch: BatchId) {
118+
self.batches.remove(&batch);
119+
}
114120
}
115121
}
116122

@@ -169,6 +175,10 @@ impl Blobs<crate::store::fs::Store> {
169175
}
170176

171177
impl<S: crate::store::Store> Blobs<S> {
178+
/// Create a new Blobs protocol handler.
179+
///
180+
/// This is the low-level constructor that allows you to customize
181+
/// everything. If you don't need that, consider using [`Blobs::builder`].
172182
pub fn new(
173183
store: S,
174184
rt: LocalPoolHandle,
@@ -192,22 +202,27 @@ impl<S: crate::store::Store> Blobs<S> {
192202
}
193203
}
194204

205+
/// Get the store.
195206
pub fn store(&self) -> &S {
196207
&self.inner.store
197208
}
198209

210+
/// Get the event sender.
199211
pub fn events(&self) -> &EventSender {
200212
&self.inner.events
201213
}
202214

215+
/// Get the local pool handle.
203216
pub fn rt(&self) -> &LocalPoolHandle {
204217
&self.inner.rt
205218
}
206219

220+
/// Get the downloader.
207221
pub fn downloader(&self) -> &Downloader {
208222
&self.inner.downloader
209223
}
210224

225+
/// Get the endpoint.
211226
pub fn endpoint(&self) -> &Endpoint {
212227
&self.inner.endpoint
213228
}
@@ -274,42 +289,3 @@ impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
274289
})
275290
}
276291
}
277-
278-
/// A request to the node to download and share the data specified by the hash.
279-
#[derive(Debug, Clone, Serialize, Deserialize)]
280-
pub struct BlobDownloadRequest {
281-
/// This mandatory field contains the hash of the data to download and share.
282-
pub hash: Hash,
283-
/// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
284-
/// well.
285-
pub format: BlobFormat,
286-
/// This mandatory field specifies the nodes to download the data from.
287-
///
288-
/// If set to more than a single node, they will all be tried. If `mode` is set to
289-
/// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
290-
/// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
291-
/// if the concurrency limits permit.
292-
pub nodes: Vec<NodeAddr>,
293-
/// Optional tag to tag the data with.
294-
pub tag: SetTagOption,
295-
/// Whether to directly start the download or add it to the download queue.
296-
pub mode: DownloadMode,
297-
}
298-
299-
/// Set the mode for whether to directly start the download or add it to the download queue.
300-
#[derive(Debug, Clone, Serialize, Deserialize)]
301-
pub enum DownloadMode {
302-
/// Start the download right away.
303-
///
304-
/// No concurrency limits or queuing will be applied. It is up to the user to manage download
305-
/// concurrency.
306-
Direct,
307-
/// Queue the download.
308-
///
309-
/// The download queue will be processed in-order, while respecting the downloader concurrency limits.
310-
Queued,
311-
}
312-
313-
/// Newtype for a batch id
314-
#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
315-
pub struct BatchId(pub u64);

src/rpc.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use proto::{
2323
AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate,
2424
BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse,
2525
BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest,
26-
BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest,
27-
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadResponse,
28-
ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest,
29-
ReadAtResponse, ValidateRequest,
26+
BatchUpdate, BlobDownloadRequest, BlobStatusRequest, BlobStatusResponse,
27+
ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteRequest,
28+
DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest,
29+
ReadAtRequest, ReadAtResponse, ValidateRequest,
3030
},
3131
tags::{
3232
CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest,
@@ -45,7 +45,7 @@ use crate::{
4545
downloader::{DownloadRequest, Downloader},
4646
fetch::{progress::DownloadProgress, Stats},
4747
format::collection::Collection,
48-
net_protocol::{BlobDownloadRequest, Blobs, BlobsInner},
48+
net_protocol::{Blobs, BlobsInner},
4949
provider::{AddProgress, BatchAddPathProgress},
5050
store::{
5151
ConsistencyCheckProgress, ExportProgress, FetchState, ImportProgress, MapEntry,
@@ -123,7 +123,7 @@ impl<D: crate::store::Store> Handler<D> {
123123
#[cfg(feature = "rpc")]
124124
pub(crate) async fn batches(
125125
&self,
126-
) -> tokio::sync::MutexGuard<'_, crate::net_protocol::BlobBatches> {
126+
) -> tokio::sync::MutexGuard<'_, crate::net_protocol::batches::BlobBatches> {
127127
self.0.batches.lock().await
128128
}
129129

src/rpc/client/blobs.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,10 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
8181
use tokio_util::io::{ReaderStream, StreamReader};
8282
use tracing::warn;
8383

84-
pub use crate::net_protocol::DownloadMode;
8584
use crate::{
8685
fetch::progress::DownloadProgress as BytesDownloadProgress,
8786
format::collection::{Collection, SimpleStore},
88-
net_protocol::BlobDownloadRequest,
89-
rpc::proto::RpcService,
87+
rpc::proto::{blobs::BlobDownloadRequest, RpcService},
9088
store::{
9189
BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode,
9290
ExportProgress as BytesExportProgress, ValidateProgress,
@@ -861,6 +859,20 @@ impl Future for ExportProgress {
861859
}
862860
}
863861

862+
/// Set the mode for whether to directly start the download or add it to the download queue.
863+
#[derive(Debug, Clone, Serialize, Deserialize)]
864+
pub enum DownloadMode {
865+
/// Start the download right away.
866+
///
867+
/// No concurrency limits or queuing will be applied. It is up to the user to manage download
868+
/// concurrency.
869+
Direct,
870+
/// Queue the download.
871+
///
872+
/// The download queue will be processed in-order, while respecting the downloader concurrency limits.
873+
Queued,
874+
}
875+
864876
/// Data reader for a single blob.
865877
///
866878
/// Implements [`AsyncRead`].

src/rpc/client/blobs/batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tracing::{debug, warn};
1717
use super::WrapOption;
1818
use crate::{
1919
format::collection::Collection,
20-
net_protocol::BatchId,
20+
net_protocol::batches::BatchId,
2121
provider::BatchAddPathProgress,
2222
rpc::proto::{
2323
blobs::{

src/rpc/proto/blobs.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@
22
use std::path::PathBuf;
33

44
use bytes::Bytes;
5+
use iroh::NodeAddr;
56
use nested_enum_utils::enum_conversions;
67
use quic_rpc_derive::rpc_requests;
78
use serde::{Deserialize, Serialize};
89

910
use super::{RpcError, RpcResult, RpcService};
1011
use crate::{
1112
format::collection::Collection,
12-
net_protocol::{BatchId, BlobDownloadRequest},
13+
net_protocol::batches::BatchId,
1314
provider::{AddProgress, BatchAddPathProgress},
14-
rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption},
15+
rpc::client::blobs::{
16+
BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, ReadAtLen, WrapOption,
17+
},
1518
store::{
1619
BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ExportProgress,
1720
ImportMode, ValidateProgress,
@@ -314,3 +317,24 @@ pub struct BatchAddPathRequest {
314317
/// Response to a batch add path request
315318
#[derive(Serialize, Deserialize, Debug)]
316319
pub struct BatchAddPathResponse(pub BatchAddPathProgress);
320+
321+
/// A request to the node to download and share the data specified by the hash.
322+
#[derive(Debug, Clone, Serialize, Deserialize)]
323+
pub struct BlobDownloadRequest {
324+
/// This mandatory field contains the hash of the data to download and share.
325+
pub hash: Hash,
326+
/// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
327+
/// well.
328+
pub format: BlobFormat,
329+
/// This mandatory field specifies the nodes to download the data from.
330+
///
331+
/// If set to more than a single node, they will all be tried. If `mode` is set to
332+
/// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
333+
/// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
334+
/// if the concurrency limits permit.
335+
pub nodes: Vec<NodeAddr>,
336+
/// Optional tag to tag the data with.
337+
pub tag: SetTagOption,
338+
/// Whether to directly start the download or add it to the download queue.
339+
pub mode: DownloadMode,
340+
}

src/rpc/proto/tags.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use quic_rpc_derive::rpc_requests;
44
use serde::{Deserialize, Serialize};
55

66
use super::{RpcResult, RpcService};
7-
use crate::{net_protocol::BatchId, rpc::client::tags::TagInfo, HashAndFormat, Tag};
7+
use crate::{net_protocol::batches::BatchId, rpc::client::tags::TagInfo, HashAndFormat, Tag};
88

99
#[allow(missing_docs)]
1010
#[derive(strum::Display, Debug, Serialize, Deserialize)]

0 commit comments

Comments
 (0)