From 8b552a37ae4346eb69dcfff87ae1783c6e227d55 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 10 Dec 2024 11:14:31 +0200 Subject: [PATCH 1/4] WIP --- tests/rpc.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 tests/rpc.rs diff --git a/tests/rpc.rs b/tests/rpc.rs new file mode 100644 index 000000000..c247f6b6a --- /dev/null +++ b/tests/rpc.rs @@ -0,0 +1,69 @@ +#![cfg(feature = "test")] +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; + +use iroh_blobs::{net_protocol::Blobs, util::local_pool::{self, LocalPool}}; + +use quinn::{ + crypto::rustls::{QuicClientConfig, QuicServerConfig}, + rustls, ClientConfig, Endpoint, ServerConfig, +}; + + +/// Returns default server configuration along with its certificate. +#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527 +fn configure_server() -> anyhow::Result<(ServerConfig, Vec)> { + let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?; + let cert_der = cert.cert.der(); + let priv_key = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); + let cert_chain = vec![cert_der.clone()]; + + let crypto_server_config = rustls::ServerConfig::builder_with_provider(Arc::new( + rustls::crypto::ring::default_provider(), + )) + .with_protocol_versions(&[&rustls::version::TLS13]) + .expect("valid versions") + .with_no_client_auth() + .with_single_cert(cert_chain, priv_key.into())?; + let quic_server_config = QuicServerConfig::try_from(crypto_server_config)?; + let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config)); + + Arc::get_mut(&mut server_config.transport) + .unwrap() + .max_concurrent_uni_streams(0_u8.into()); + + Ok((server_config, cert_der.to_vec())) +} + +pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, Vec)> { + let (server_config, server_cert) = configure_server()?; + let endpoint = Endpoint::server(server_config, bind_addr)?; + Ok((endpoint, server_cert)) +} + +/// An iroh node that just has the blobs transport +#[derive(Debug)] +pub struct Node { + pub router: iroh::protocol::Router, + pub blobs: Blobs, + pub _local_pool: LocalPool, +} + +impl Node { + pub async fn new(path: PathBuf) -> anyhow::Result { + let store = iroh_blobs::store::fs::Store::load(path).await?; + let local_pool = LocalPool::default(); + let endpoint = iroh::Endpoint::builder().bind().await?; + let blobs = Blobs::builder(store).build(local_pool.handle(), &endpoint); + let router = iroh::protocol::Router::builder(endpoint) + .accept(iroh_blobs::ALPN, blobs.clone()) + .spawn() + .await?; + let endpoint = quinn::Endpoint::server(config, "0.0.0.0:12345".parse().unwrap())?; + let rpc_server = quic_rpc::transport::quinn::QuinnListener::new(endpoint) + Ok(Self { + router, + blobs, + _local_pool: local_pool, + }) + } +} \ No newline at end of file From 3467681677f2b421fa6d68cd8e002eacd9edfa01 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 10 Dec 2024 12:31:53 +0200 Subject: [PATCH 2/4] Add a very basic test for quinn rpc --- .gitignore | 1 + Cargo.lock | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 2 + tests/rpc.rs | 97 +++++++++++++++++++++++++++++++++++++++++------- 4 files changed, 187 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index fe8147e83..e31b44fbe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target iroh.config.toml +.vscode/* diff --git a/Cargo.lock b/Cargo.lock index abe4aaa92..cac76ca32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -218,6 +218,15 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "atomic-polyfill" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4" +dependencies = [ + "critical-section", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -363,6 +372,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597bb81c80a54b6a4381b23faba8d7774b144c94cbd1d6fe3f1329bd776554ab" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -667,6 +685,12 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -943,6 +967,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "educe" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4bd92664bf78c4d3dba9b7cdafce6fa15b13ed3ed16175218196942e99168a8" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "elliptic-curve" version = "0.13.8" @@ -992,6 +1028,26 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "enum-ordinalize" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5" +dependencies = [ + "enum-ordinalize-derive", +] + +[[package]] +name = "enum-ordinalize-derive" +version = "4.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "enumflags2" version = "0.7.10" @@ -1426,6 +1482,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hash32" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -1450,6 +1515,20 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "heapless" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f" +dependencies = [ + "atomic-polyfill", + "hash32", + "rustc_version", + "serde", + "spin", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.5.0" @@ -3270,6 +3349,7 @@ dependencies = [ "cobs", "embedded-io 0.4.0", "embedded-io 0.6.1", + "heapless", "postcard-derive", "serde", ] @@ -3454,21 +3534,24 @@ dependencies = [ [[package]] name = "quic-rpc" version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ae09230350898e9a243a7a4a5fdde934edfb6b010e6a9cfb4136f79b54dbbb" +source = "git+https://github.com/n0-computer/quic-rpc#1778e1d9523ade44a670f11d4263f8b9e1115b0c" dependencies = [ "anyhow", + "bytes", "derive_more", "flume", "futures-lite 2.5.0", "futures-sink", "futures-util", + "iroh-quinn", "pin-project", + "postcard", "serde", "slab", "smallvec", "time", "tokio", + "tokio-serde", "tokio-util", "tracing", ] @@ -4822,6 +4905,21 @@ dependencies = [ "x509-parser", ] +[[package]] +name = "tokio-serde" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf600e7036b17782571dd44fa0a5cea3c82f60db5137f774a325a76a0d6852b" +dependencies = [ + "bincode", + "bytes", + "educe", + "futures-core", + "futures-sink", + "pin-project", + "serde", +] + [[package]] name = "tokio-stream" version = "0.1.16" diff --git a/Cargo.toml b/Cargo.toml index 56660f903..692f11947 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,7 @@ example-iroh = [ "dep:console", "iroh/discovery-local-network" ] +test = ["quic-rpc/quinn-transport"] [package.metadata.docs.rs] all-features = true @@ -183,3 +184,4 @@ incremental = false [patch.crates-io] iroh-base = { git = "https://github.com/n0-computer/iroh" } iroh = { git = "https://github.com/n0-computer/iroh" } +quic-rpc = { git = "https://github.com/n0-computer/quic-rpc" } diff --git a/tests/rpc.rs b/tests/rpc.rs index c247f6b6a..8daa9dc70 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -1,17 +1,47 @@ #![cfg(feature = "test")] use std::{net::SocketAddr, path::PathBuf, sync::Arc}; -use iroh_blobs::{net_protocol::Blobs, util::local_pool::{self, LocalPool}}; - +use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool}; +use quic_rpc::transport::quinn::QuinnConnector; use quinn::{ crypto::rustls::{QuicClientConfig, QuicServerConfig}, rustls, ClientConfig, Endpoint, ServerConfig, }; +use rcgen::CertifiedKey; +use tempfile::TempDir; +use testresult::TestResult; +use tokio_util::task::AbortOnDropHandle; + +type QC = QuinnConnector; +type BlobsClient = iroh_blobs::rpc::client::blobs::Client; +/// Builds default quinn client config and trusts given certificates. +/// +/// ## Args +/// +/// - server_certs: a list of trusted certificates in DER format. +fn configure_client(server_certs: &[CertifiedKey]) -> anyhow::Result { + let mut certs = rustls::RootCertStore::empty(); + for cert in server_certs { + let cert = cert.cert.der().clone(); + certs.add(cert)?; + } + + let crypto_client_config = rustls::ClientConfig::builder_with_provider(Arc::new( + rustls::crypto::ring::default_provider(), + )) + .with_protocol_versions(&[&rustls::version::TLS13]) + .expect("valid versions") + .with_root_certificates(certs) + .with_no_client_auth(); + let quic_client_config = QuicClientConfig::try_from(crypto_client_config)?; + + Ok(ClientConfig::new(Arc::new(quic_client_config))) +} /// Returns default server configuration along with its certificate. #[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527 -fn configure_server() -> anyhow::Result<(ServerConfig, Vec)> { +fn configure_server() -> anyhow::Result<(ServerConfig, CertifiedKey)> { let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?; let cert_der = cert.cert.der(); let priv_key = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); @@ -31,25 +61,36 @@ fn configure_server() -> anyhow::Result<(ServerConfig, Vec)> { .unwrap() .max_concurrent_uni_streams(0_u8.into()); - Ok((server_config, cert_der.to_vec())) + Ok((server_config, cert)) } -pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, Vec)> { +pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, CertifiedKey)> { let (server_config, server_cert) = configure_server()?; let endpoint = Endpoint::server(server_config, bind_addr)?; Ok((endpoint, server_cert)) } +pub fn make_client_endpoint( + bind_addr: SocketAddr, + server_certs: &[CertifiedKey], +) -> anyhow::Result { + let client_cfg = configure_client(server_certs)?; + let mut endpoint = Endpoint::client(bind_addr)?; + endpoint.set_default_client_config(client_cfg); + Ok(endpoint) +} + /// An iroh node that just has the blobs transport #[derive(Debug)] pub struct Node { pub router: iroh::protocol::Router, pub blobs: Blobs, - pub _local_pool: LocalPool, + pub local_pool: LocalPool, + pub rpc_task: AbortOnDropHandle<()>, } impl Node { - pub async fn new(path: PathBuf) -> anyhow::Result { + pub async fn new(path: PathBuf) -> anyhow::Result<(Self, SocketAddr, CertifiedKey)> { let store = iroh_blobs::store::fs::Store::load(path).await?; let local_pool = LocalPool::default(); let endpoint = iroh::Endpoint::builder().bind().await?; @@ -58,12 +99,42 @@ impl Node { .accept(iroh_blobs::ALPN, blobs.clone()) .spawn() .await?; - let endpoint = quinn::Endpoint::server(config, "0.0.0.0:12345".parse().unwrap())?; - let rpc_server = quic_rpc::transport::quinn::QuinnListener::new(endpoint) - Ok(Self { + let (config, key) = configure_server()?; + let endpoint = quinn::Endpoint::server(config, "127.0.0.1:0".parse().unwrap())?; + let local_addr = endpoint.local_addr()?; + let rpc_server = quic_rpc::transport::quinn::QuinnListener::new(endpoint)?; + let rpc_server = + quic_rpc::RpcServer::::new(rpc_server); + let blobs2 = blobs.clone(); + let rpc_task = rpc_server + .spawn_accept_loop(move |msg, chan| blobs2.clone().handle_rpc_request(msg, chan)); + let node = Self { router, blobs, - _local_pool: local_pool, - }) + local_pool, + rpc_task, + }; + Ok((node, local_addr, key)) } -} \ No newline at end of file +} + +async fn node_and_client() -> TestResult<(Node, BlobsClient, TempDir)> { + let testdir = tempfile::tempdir()?; + let (node, addr, key) = Node::new(testdir.path().join("blobs")).await?; + let client = make_client_endpoint("127.0.0.1:0".parse().unwrap(), &[key])?; + let client = QuinnConnector::new(client, addr, "localhost".to_string()); + let client = quic_rpc::RpcClient::::new(client); + let client = iroh_blobs::rpc::client::blobs::Client::new(client); + Ok((node, client, testdir)) +} + +#[tokio::test] +async fn quinn_rpc_smoke() -> TestResult<()> { + let _ = tracing_subscriber::fmt::try_init(); + let (node, client, _testdir) = node_and_client().await?; + println!("Made a client"); + let hash = client.add_bytes(b"hello".to_vec()).await?; + println!("Hash: {:?}", hash); + drop(node); + Ok(()) +} From dced099cca79326558a6caee3dcf144d6c9e4729 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 10 Dec 2024 13:53:38 +0200 Subject: [PATCH 3/4] Add tests that use the rpc client with quinn. Some errors don't happen if you use the mem transport. Also fix bug where add_bytes fails if the bytes are larger than the max frame size. --- Cargo.lock | 1 - Cargo.toml | 2 +- src/rpc/client/blobs.rs | 16 +++++++++++----- tests/rpc.rs | 23 ++++++++++++++++++----- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cac76ca32..c2b09240a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3534,7 +3534,6 @@ dependencies = [ [[package]] name = "quic-rpc" version = "0.17.0" -source = "git+https://github.com/n0-computer/quic-rpc#1778e1d9523ade44a670f11d4263f8b9e1115b0c" dependencies = [ "anyhow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 692f11947..43c0bde97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -184,4 +184,4 @@ incremental = false [patch.crates-io] iroh-base = { git = "https://github.com/n0-computer/iroh" } iroh = { git = "https://github.com/n0-computer/iroh" } -quic-rpc = { git = "https://github.com/n0-computer/quic-rpc" } +quic-rpc = { path = "../quic-rpc" } diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 4183ffb15..334caf939 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -268,10 +268,10 @@ where } }); tokio::spawn(async move { - // TODO: Is it important to catch this error? It should also result in an error on the - // response stream. If we deem it important, we could one-shot send it into the - // BlobAddProgress and return from there. Not sure. if let Err(err) = sink.send_all(&mut input).await { + // if we get an error in send_all due to the connection being closed, this will just fail again. + // if we get an error due to something else (serialization or size limit), tell the remote to abort. + sink.send(AddStreamUpdate::Abort).await.ok(); warn!("Failed to send input stream to remote: {err:?}"); } }); @@ -281,7 +281,7 @@ where /// Write a blob by passing bytes. pub async fn add_bytes(&self, bytes: impl Into) -> anyhow::Result { - let input = futures_lite::stream::once(Ok(bytes.into())); + let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok); self.add_stream(input, SetTagOption::Auto).await?.await } @@ -291,7 +291,7 @@ where bytes: impl Into, name: impl Into, ) -> anyhow::Result { - let input = futures_lite::stream::once(Ok(bytes.into())); + let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok); self.add_stream(input, SetTagOption::Named(name.into())) .await? .await @@ -987,6 +987,12 @@ pub struct DownloadOptions { pub mode: DownloadMode, } +fn chunked_bytes_stream(mut b: Bytes, c: usize) -> impl Stream { + futures_lite::stream::iter(std::iter::from_fn(move || { + Some(b.split_to(b.len().min(c))).filter(|x| !x.is_empty()) + })) +} + #[cfg(test)] mod tests { use std::{path::Path, time::Duration}; diff --git a/tests/rpc.rs b/tests/rpc.rs index 8daa9dc70..5eab61273 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -131,10 +131,23 @@ async fn node_and_client() -> TestResult<(Node, BlobsClient, TempDir)> { #[tokio::test] async fn quinn_rpc_smoke() -> TestResult<()> { let _ = tracing_subscriber::fmt::try_init(); - let (node, client, _testdir) = node_and_client().await?; - println!("Made a client"); - let hash = client.add_bytes(b"hello".to_vec()).await?; - println!("Hash: {:?}", hash); - drop(node); + let (_node, client, _testdir) = node_and_client().await?; + let data = b"hello"; + let hash = client.add_bytes(data.to_vec()).await?.hash; + assert_eq!(hash, iroh_blobs::Hash::new(data)); + let data2 = client.read_to_bytes(hash).await?; + assert_eq!(data, &data2[..]); + Ok(()) +} + +#[tokio::test] +async fn quinn_rpc_large() -> TestResult<()> { + let _ = tracing_subscriber::fmt::try_init(); + let (_node, client, _testdir) = node_and_client().await?; + let data = vec![0; 1024 * 1024 * 16]; + let hash = client.add_bytes(data.clone()).await?.hash; + assert_eq!(hash, iroh_blobs::Hash::new(&data)); + let data2 = client.read_to_bytes(hash).await?; + assert_eq!(data, &data2[..]); Ok(()) } From 18b31d5dc492aa8ce1a3fd9a28103f9c18a54e36 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 10 Dec 2024 14:49:38 +0200 Subject: [PATCH 4/4] use published quic-rpc --- Cargo.lock | 4 +++- Cargo.toml | 3 +-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2b09240a..1c433d78e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3533,7 +3533,9 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.17.0" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7a980daf521a275ae2a04fefc311c96fd0cf11ae430324d1b914d072bcc408b" dependencies = [ "anyhow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 43c0bde97..f9c9d3a75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ postcard = { version = "1", default-features = false, features = [ "use-std", "experimental-derive", ] } -quic-rpc = { version = "0.17", optional = true } +quic-rpc = { version = "0.17.1", optional = true } quic-rpc-derive = { version = "0.17", optional = true } quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] } rand = "0.8" @@ -184,4 +184,3 @@ incremental = false [patch.crates-io] iroh-base = { git = "https://github.com/n0-computer/iroh" } iroh = { git = "https://github.com/n0-computer/iroh" } -quic-rpc = { path = "../quic-rpc" }