Skip to content

Commit 1ca13d0

Browse files
committed
fmt
1 parent b3d0c58 commit 1ca13d0

File tree

6 files changed

+101
-87
lines changed

6 files changed

+101
-87
lines changed

src/rpc.rs

Lines changed: 69 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,18 @@ use iroh_base::hash::{BlobFormat, HashAndFormat};
1818
use iroh_io::AsyncSliceReader;
1919
use proto::{
2020
blobs::{
21-
AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate, BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse, ValidateRequest
21+
AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate,
22+
BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse,
23+
BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest,
24+
BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest,
25+
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadResponse,
26+
ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest,
27+
ReadAtResponse, ValidateRequest,
28+
},
29+
tags::{
30+
CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest,
31+
ListRequest as TagListRequest, SetRequest as TagsSetRequest, SyncMode,
2232
},
23-
tags::SyncMode,
2433
RpcError, RpcResult,
2534
};
2635
use quic_rpc::server::{RpcChannel, RpcServerError};
@@ -38,10 +47,6 @@ use crate::{
3847
},
3948
Tag,
4049
};
41-
use proto::tags::{
42-
CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest,
43-
ListRequest as TagListRequest, SetRequest as TagsSetRequest,
44-
};
4550
pub mod client;
4651
pub mod proto;
4752

@@ -51,15 +56,15 @@ const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64;
5156
const RPC_BLOB_GET_CHANNEL_CAP: usize = 2;
5257

5358
impl<D: crate::store::Store> Blobs<D> {
54-
5559
/// Handle an RPC request
56-
pub async fn handle_rpc_request<S, C>(self: Arc<Self>,
60+
pub async fn handle_rpc_request<S, C>(
61+
self: Arc<Self>,
5762
msg: crate::rpc::proto::Request,
58-
chan: RpcChannel<crate::rpc::proto::RpcService, C, S>
63+
chan: RpcChannel<crate::rpc::proto::RpcService, C, S>,
5964
) -> std::result::Result<(), RpcServerError<C>>
60-
where
61-
S: quic_rpc::Service,
62-
C: quic_rpc::ServiceEndpoint<S>,
65+
where
66+
S: quic_rpc::Service,
67+
C: quic_rpc::ServiceEndpoint<S>,
6368
{
6469
use crate::rpc::proto::Request::*;
6570
match msg {
@@ -72,11 +77,11 @@ impl<D: crate::store::Store> Blobs<D> {
7277
pub async fn handle_tags_request<S, C>(
7378
self: Arc<Self>,
7479
msg: proto::tags::Request,
75-
chan: RpcChannel<proto::RpcService, C, S>
80+
chan: RpcChannel<proto::RpcService, C, S>,
7681
) -> std::result::Result<(), RpcServerError<C>>
77-
where
78-
S: quic_rpc::Service,
79-
C: quic_rpc::ServiceEndpoint<S>,
82+
where
83+
S: quic_rpc::Service,
84+
C: quic_rpc::ServiceEndpoint<S>,
8085
{
8186
use proto::tags::Request::*;
8287
match msg {
@@ -91,46 +96,46 @@ impl<D: crate::store::Store> Blobs<D> {
9196
pub async fn handle_blobs_request<Sv, C>(
9297
self: Arc<Self>,
9398
msg: proto::blobs::Request,
94-
chan: RpcChannel<proto::RpcService, C, Sv>
99+
chan: RpcChannel<proto::RpcService, C, Sv>,
95100
) -> std::result::Result<(), RpcServerError<C>>
96-
where
97-
Sv: quic_rpc::Service,
98-
C: quic_rpc::ServiceEndpoint<Sv>,
101+
where
102+
Sv: quic_rpc::Service,
103+
C: quic_rpc::ServiceEndpoint<Sv>,
99104
{
100105
use proto::blobs::Request::*;
101-
match msg {
102-
List(msg) => chan.server_streaming(msg, self, Self::blob_list).await,
103-
ListIncomplete(msg) => {
104-
chan.server_streaming(msg, self, Self::blob_list_incomplete)
105-
.await
106-
}
107-
CreateCollection(msg) => chan.rpc(msg, self, Self::create_collection).await,
108-
Delete(msg) => chan.rpc(msg, self, Self::blob_delete_blob).await,
109-
AddPath(msg) => {
110-
chan.server_streaming(msg, self, Self::blob_add_from_path)
111-
.await
112-
}
113-
Download(msg) => chan.server_streaming(msg, self, Self::blob_download).await,
114-
Export(msg) => chan.server_streaming(msg, self, Self::blob_export).await,
115-
Validate(msg) => chan.server_streaming(msg, self, Self::blob_validate).await,
116-
Fsck(msg) => {
117-
chan.server_streaming(msg, self, Self::blob_consistency_check)
118-
.await
119-
}
120-
ReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await,
121-
AddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await,
122-
AddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage),
123-
BlobStatus(msg) => chan.rpc(msg, self, Self::blob_status).await,
124-
BatchCreate(msg) => chan.bidi_streaming(msg, self, Self::batch_create).await,
125-
BatchUpdate(_) => Err(RpcServerError::UnexpectedStartMessage),
126-
BatchAddStream(msg) => chan.bidi_streaming(msg, self, Self::batch_add_stream).await,
127-
BatchAddStreamUpdate(_) => Err(RpcServerError::UnexpectedStartMessage),
128-
BatchAddPath(msg) => {
129-
chan.server_streaming(msg, self, Self::batch_add_from_path)
130-
.await
106+
match msg {
107+
List(msg) => chan.server_streaming(msg, self, Self::blob_list).await,
108+
ListIncomplete(msg) => {
109+
chan.server_streaming(msg, self, Self::blob_list_incomplete)
110+
.await
111+
}
112+
CreateCollection(msg) => chan.rpc(msg, self, Self::create_collection).await,
113+
Delete(msg) => chan.rpc(msg, self, Self::blob_delete_blob).await,
114+
AddPath(msg) => {
115+
chan.server_streaming(msg, self, Self::blob_add_from_path)
116+
.await
117+
}
118+
Download(msg) => chan.server_streaming(msg, self, Self::blob_download).await,
119+
Export(msg) => chan.server_streaming(msg, self, Self::blob_export).await,
120+
Validate(msg) => chan.server_streaming(msg, self, Self::blob_validate).await,
121+
Fsck(msg) => {
122+
chan.server_streaming(msg, self, Self::blob_consistency_check)
123+
.await
124+
}
125+
ReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await,
126+
AddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await,
127+
AddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage),
128+
BlobStatus(msg) => chan.rpc(msg, self, Self::blob_status).await,
129+
BatchCreate(msg) => chan.bidi_streaming(msg, self, Self::batch_create).await,
130+
BatchUpdate(_) => Err(RpcServerError::UnexpectedStartMessage),
131+
BatchAddStream(msg) => chan.bidi_streaming(msg, self, Self::batch_add_stream).await,
132+
BatchAddStreamUpdate(_) => Err(RpcServerError::UnexpectedStartMessage),
133+
BatchAddPath(msg) => {
134+
chan.server_streaming(msg, self, Self::batch_add_from_path)
135+
.await
136+
}
137+
BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await,
131138
}
132-
BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await,
133-
}
134139
}
135140

136141
async fn blob_status(self: Arc<Self>, msg: BlobStatusRequest) -> RpcResult<BlobStatusResponse> {
@@ -236,7 +241,10 @@ impl<D: crate::store::Store> Blobs<D> {
236241
Ok(())
237242
}
238243

239-
fn blob_list_tags(self: Arc<Self>, msg: TagListRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
244+
fn blob_list_tags(
245+
self: Arc<Self>,
246+
msg: TagListRequest,
247+
) -> impl Stream<Item = TagInfo> + Send + 'static {
240248
tracing::info!("blob_list_tags");
241249
let blobs = self;
242250
Gen::new(|co| async move {
@@ -353,7 +361,10 @@ impl<D: crate::store::Store> Blobs<D> {
353361
Ok(tag)
354362
}
355363

356-
fn blob_download(self: Arc<Self>, msg: BlobDownloadRequest) -> impl Stream<Item = DownloadResponse> {
364+
fn blob_download(
365+
self: Arc<Self>,
366+
msg: BlobDownloadRequest,
367+
) -> impl Stream<Item = DownloadResponse> {
357368
let (sender, receiver) = async_channel::bounded(1024);
358369
let endpoint = self.endpoint().clone();
359370
let progress = AsyncChannelProgressSender::new(sender);
@@ -518,7 +529,10 @@ impl<D: crate::store::Store> Blobs<D> {
518529
Ok(())
519530
}
520531

521-
async fn batch_create_temp_tag(self: Arc<Self>, msg: BatchCreateTempTagRequest) -> RpcResult<()> {
532+
async fn batch_create_temp_tag(
533+
self: Arc<Self>,
534+
msg: BatchCreateTempTagRequest,
535+
) -> RpcResult<()> {
522536
let blobs = self;
523537
let tag = blobs.store().temp_tag(msg.content);
524538
blobs.batches().await.store(msg.batch, tag);
@@ -813,7 +827,6 @@ impl<D: crate::store::Store> Blobs<D> {
813827
.into_stream()
814828
}
815829

816-
817830
async fn create_collection(
818831
self: Arc<Self>,
819832
req: CreateCollectionRequest,

src/rpc/client/blobs.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,6 @@ use std::{
6868
task::{Context, Poll},
6969
};
7070

71-
pub use crate::net_protocol::DownloadMode;
72-
use crate::{
73-
export::ExportProgress as BytesExportProgress,
74-
format::collection::{Collection, SimpleStore},
75-
get::db::DownloadProgress as BytesDownloadProgress,
76-
net_protocol::BlobDownloadRequest,
77-
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
78-
util::SetTagOption,
79-
BlobFormat, Hash, Tag,
80-
};
8171
use anyhow::{anyhow, Context as _, Result};
8272
use bytes::Bytes;
8373
use futures_lite::{Stream, StreamExt};
@@ -91,6 +81,17 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
9181
use tokio_util::io::{ReaderStream, StreamReader};
9282
use tracing::warn;
9383

84+
pub use crate::net_protocol::DownloadMode;
85+
use crate::{
86+
export::ExportProgress as BytesExportProgress,
87+
format::collection::{Collection, SimpleStore},
88+
get::db::DownloadProgress as BytesDownloadProgress,
89+
net_protocol::BlobDownloadRequest,
90+
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
91+
util::SetTagOption,
92+
BlobFormat, Hash, Tag,
93+
};
94+
9495
mod batch;
9596
pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};
9697

@@ -982,13 +983,13 @@ pub struct DownloadOptions {
982983

983984
#[cfg(test)]
984985
mod tests {
985-
use crate::hashseq::HashSeq;
986986
use iroh_net::NodeId;
987987
use rand::RngCore;
988988
use testresult::TestResult;
989989
use tokio::{io::AsyncWriteExt, sync::mpsc};
990990

991991
use super::*;
992+
use crate::hashseq::HashSeq;
992993

993994
#[tokio::test]
994995
async fn test_blob_create_collection() -> Result<()> {

src/rpc/client/blobs/batch.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,6 @@ use std::{
44
sync::{Arc, Mutex},
55
};
66

7-
use crate::{
8-
format::collection::Collection,
9-
net_protocol::BatchId,
10-
provider::BatchAddPathProgress,
11-
store::ImportMode,
12-
util::{SetTagOption, TagDrop},
13-
BlobFormat, HashAndFormat, Tag, TempTag,
14-
};
157
use anyhow::{anyhow, Context, Result};
168
use bytes::Bytes;
179
use futures_buffered::BufferedStreamExt;
@@ -23,12 +15,20 @@ use tokio_util::io::ReaderStream;
2315
use tracing::{debug, warn};
2416

2517
use super::WrapOption;
26-
use crate::rpc::proto::{
27-
blobs::{
28-
BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate,
29-
BatchCreateTempTagRequest, BatchUpdate,
18+
use crate::{
19+
format::collection::Collection,
20+
net_protocol::BatchId,
21+
provider::BatchAddPathProgress,
22+
rpc::proto::{
23+
blobs::{
24+
BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse,
25+
BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchUpdate,
26+
},
27+
tags::{self, SyncMode},
3028
},
31-
tags::{self, SyncMode},
29+
store::ImportMode,
30+
util::{SetTagOption, TagDrop},
31+
BlobFormat, HashAndFormat, Tag, TempTag,
3232
};
3333

3434
/// A scope in which blobs can be added.

src/rpc/client/tags.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
//! [`Client::list_hash_seq`] can be used to list all tags with a hash_seq format.
1313
//!
1414
//! [`Client::delete`] can be used to delete a tag.
15-
use crate::{BlobFormat, Hash, Tag};
1615
use anyhow::Result;
1716
use futures_lite::{Stream, StreamExt};
1817
use quic_rpc::RpcClient;
1918
use serde::{Deserialize, Serialize};
2019

21-
use crate::rpc::proto::tags::{DeleteRequest, ListRequest};
20+
use crate::{
21+
rpc::proto::tags::{DeleteRequest, ListRequest},
22+
BlobFormat, Hash, Tag,
23+
};
2224

2325
/// Iroh tags client.
2426
#[derive(Debug, Clone)]

src/rpc/proto/blobs.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ use nested_enum_utils::enum_conversions;
77
use quic_rpc_derive::rpc_requests;
88
use serde::{Deserialize, Serialize};
99

10+
use super::{RpcError, RpcResult, RpcService};
1011
use crate::{
1112
export::ExportProgress,
1213
format::collection::Collection,
1314
get::db::DownloadProgress,
1415
net_protocol::{BatchId, BlobDownloadRequest},
1516
provider::{AddProgress, BatchAddPathProgress},
17+
rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption},
1618
store::{
1719
BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode,
1820
ValidateProgress,
@@ -21,9 +23,6 @@ use crate::{
2123
BlobFormat, HashAndFormat, Tag,
2224
};
2325

24-
use super::{RpcError, RpcResult, RpcService};
25-
use crate::rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption};
26-
2726
#[allow(missing_docs)]
2827
#[derive(strum::Display, Debug, Serialize, Deserialize)]
2928
#[enum_conversions(super::Request)]

src/rpc/proto/tags.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use quic_rpc_derive::rpc_requests;
44
use serde::{Deserialize, Serialize};
55

66
use super::{RpcResult, RpcService};
7-
use crate::rpc::client::tags::TagInfo;
8-
use crate::{net_protocol::BatchId, HashAndFormat, Tag};
7+
use crate::{net_protocol::BatchId, rpc::client::tags::TagInfo, HashAndFormat, Tag};
98

109
#[allow(missing_docs)]
1110
#[derive(strum::Display, Debug, Serialize, Deserialize)]

0 commit comments

Comments
 (0)