Skip to content

Commit dced099

Browse files
committed
Add tests that use the rpc client with quinn.
Some errors don't happen if you use the mem transport. Also fix bug where add_bytes fails if the bytes are larger than the max frame size.
1 parent 3467681 commit dced099

File tree

4 files changed

+30
-12
lines changed

4 files changed

+30
-12
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,4 @@ incremental = false
184184
[patch.crates-io]
185185
iroh-base = { git = "https://github.com/n0-computer/iroh" }
186186
iroh = { git = "https://github.com/n0-computer/iroh" }
187-
quic-rpc = { git = "https://github.com/n0-computer/quic-rpc" }
187+
quic-rpc = { path = "../quic-rpc" }

src/rpc/client/blobs.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,10 @@ where
268268
}
269269
});
270270
tokio::spawn(async move {
271-
// TODO: Is it important to catch this error? It should also result in an error on the
272-
// response stream. If we deem it important, we could one-shot send it into the
273-
// BlobAddProgress and return from there. Not sure.
274271
if let Err(err) = sink.send_all(&mut input).await {
272+
// if we get an error in send_all due to the connection being closed, this will just fail again.
273+
// if we get an error due to something else (serialization or size limit), tell the remote to abort.
274+
sink.send(AddStreamUpdate::Abort).await.ok();
275275
warn!("Failed to send input stream to remote: {err:?}");
276276
}
277277
});
@@ -281,7 +281,7 @@ where
281281

282282
/// Write a blob by passing bytes.
283283
pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<AddOutcome> {
284-
let input = futures_lite::stream::once(Ok(bytes.into()));
284+
let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
285285
self.add_stream(input, SetTagOption::Auto).await?.await
286286
}
287287

@@ -291,7 +291,7 @@ where
291291
bytes: impl Into<Bytes>,
292292
name: impl Into<Tag>,
293293
) -> anyhow::Result<AddOutcome> {
294-
let input = futures_lite::stream::once(Ok(bytes.into()));
294+
let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
295295
self.add_stream(input, SetTagOption::Named(name.into()))
296296
.await?
297297
.await
@@ -987,6 +987,12 @@ pub struct DownloadOptions {
987987
pub mode: DownloadMode,
988988
}
989989

990+
fn chunked_bytes_stream(mut b: Bytes, c: usize) -> impl Stream<Item = Bytes> {
991+
futures_lite::stream::iter(std::iter::from_fn(move || {
992+
Some(b.split_to(b.len().min(c))).filter(|x| !x.is_empty())
993+
}))
994+
}
995+
990996
#[cfg(test)]
991997
mod tests {
992998
use std::{path::Path, time::Duration};

tests/rpc.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,23 @@ async fn node_and_client() -> TestResult<(Node, BlobsClient, TempDir)> {
131131
#[tokio::test]
132132
async fn quinn_rpc_smoke() -> TestResult<()> {
133133
let _ = tracing_subscriber::fmt::try_init();
134-
let (node, client, _testdir) = node_and_client().await?;
135-
println!("Made a client");
136-
let hash = client.add_bytes(b"hello".to_vec()).await?;
137-
println!("Hash: {:?}", hash);
138-
drop(node);
134+
let (_node, client, _testdir) = node_and_client().await?;
135+
let data = b"hello";
136+
let hash = client.add_bytes(data.to_vec()).await?.hash;
137+
assert_eq!(hash, iroh_blobs::Hash::new(data));
138+
let data2 = client.read_to_bytes(hash).await?;
139+
assert_eq!(data, &data2[..]);
140+
Ok(())
141+
}
142+
143+
#[tokio::test]
144+
async fn quinn_rpc_large() -> TestResult<()> {
145+
let _ = tracing_subscriber::fmt::try_init();
146+
let (_node, client, _testdir) = node_and_client().await?;
147+
let data = vec![0; 1024 * 1024 * 16];
148+
let hash = client.add_bytes(data.clone()).await?.hash;
149+
assert_eq!(hash, iroh_blobs::Hash::new(&data));
150+
let data2 = client.read_to_bytes(hash).await?;
151+
assert_eq!(data, &data2[..]);
139152
Ok(())
140153
}

0 commit comments

Comments
 (0)