From 7ba883d238bb2b0be8c64672369c27074519962c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 28 Nov 2024 19:05:39 +0100 Subject: [PATCH 1/3] docs: Add simple file transfer example --- Cargo.toml | 5 ++- examples/transfer.rs | 100 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) create mode 100644 examples/transfer.rs diff --git a/Cargo.toml b/Cargo.toml index 64b37a02c..879501fcd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,7 +95,7 @@ futures-util = "0.3.30" testdir = "0.9.1" [features] -default = ["fs-store", "rpc", "net_protocol", "example-iroh"] +default = ["fs-store", "rpc", "net_protocol"] downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"] net_protocol = ["downloader"] fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"] @@ -134,6 +134,9 @@ name = "fetch-fsm" [[example]] name = "fetch-stream" +[[example]] +name = "transfer" + [[example]] name = "hello-world-fetch" required-features = ["example-iroh"] diff --git a/examples/transfer.rs b/examples/transfer.rs new file mode 100644 index 000000000..775501ab0 --- /dev/null +++ b/examples/transfer.rs @@ -0,0 +1,100 @@ +use std::{path::PathBuf, str::FromStr}; + +use anyhow::Result; +use iroh::{protocol::Router, Endpoint}; +use iroh_base::ticket::BlobTicket; +use iroh_blobs::{ + net_protocol::Blobs, + rpc::client::blobs::{ReadAtLen, WrapOption}, + store::mem, + util::{local_pool::LocalPool, SetTagOption}, +}; + +#[tokio::main] +async fn main() -> Result<()> { + // Create an endpoint, it allows creating and accepting + // connections in the iroh p2p world + let endpoint = Endpoint::builder().discovery_n0().bind().await?; + + // We initialize the Blobs protocol in-memory + let local_pool = LocalPool::default(); + let blobs = Blobs::memory().build(&local_pool, &endpoint); + + // Now we build a router that accepts blobs connections & routes them + // to the blobs protocol. + let node = Router::builder(endpoint) + .accept(iroh_blobs::ALPN, blobs) + .spawn() + .await?; + + let args = std::env::args().collect::>(); + match &args.iter().map(String::as_str).collect::>()[..] { + [_cmd, "send", path] => { + let abs_path = PathBuf::from_str(path)?.canonicalize()?; + + let blobs = node + .get_protocol::>(iroh_blobs::ALPN) + .unwrap() + .client(); + + println!("Analyzing file."); + + let blob = blobs + .add_from_path(abs_path, true, SetTagOption::Auto, WrapOption::NoWrap) + .await? + .finish() + .await?; + + let node_id = node.endpoint().node_id(); + let ticket = BlobTicket::new(node_id.into(), blob.hash, blob.format)?; + + println!("File analyzed. Fetch this file by running:"); + println!("cargo run --example transfer -- receive {ticket} {path}"); + + tokio::signal::ctrl_c().await?; + } + [_cmd, "receive", ticket, path] => { + let path_buf = PathBuf::from_str(path)?; + let ticket = BlobTicket::from_str(ticket)?; + + let blobs = node + .get_protocol::>(iroh_blobs::ALPN) + .unwrap() + .client(); + + println!("Starting download."); + + blobs + .download(ticket.hash(), ticket.node_addr().clone()) + .await? + .finish() + .await?; + + println!("Finished download."); + println!("Copying to destination."); + + let mut file = tokio::fs::File::create(path_buf).await?; + let mut reader = blobs.read_at(ticket.hash(), 0, ReadAtLen::All).await?; + tokio::io::copy(&mut reader, &mut file).await?; + + println!("Finished copying."); + } + _ => { + println!("Couldn't parse command line arguments."); + println!("Usage:"); + println!(" # to send:"); + println!(" cargo run --example transfer -- send [FILE]"); + println!(" # this will print a ticket."); + println!(); + println!(" # to receive:"); + println!(" cargo run --example transfer -- receive [TICKET] [FILE]"); + } + } + + // Gracefully shut down the node + println!("Shutting down."); + node.shutdown().await?; + local_pool.shutdown().await; + + Ok(()) +} From 7b404f12bca87b32af85ef0b099cc8174940219f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 29 Nov 2024 10:47:50 +0100 Subject: [PATCH 2/3] refactor: Avoid `get_protocol`, just keep `Arc` around --- examples/transfer.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/examples/transfer.rs b/examples/transfer.rs index 775501ab0..4e73909ea 100644 --- a/examples/transfer.rs +++ b/examples/transfer.rs @@ -6,7 +6,6 @@ use iroh_base::ticket::BlobTicket; use iroh_blobs::{ net_protocol::Blobs, rpc::client::blobs::{ReadAtLen, WrapOption}, - store::mem, util::{local_pool::LocalPool, SetTagOption}, }; @@ -23,20 +22,17 @@ async fn main() -> Result<()> { // Now we build a router that accepts blobs connections & routes them // to the blobs protocol. let node = Router::builder(endpoint) - .accept(iroh_blobs::ALPN, blobs) + .accept(iroh_blobs::ALPN, blobs.clone()) .spawn() .await?; + let blobs = blobs.client(); + let args = std::env::args().collect::>(); match &args.iter().map(String::as_str).collect::>()[..] { [_cmd, "send", path] => { let abs_path = PathBuf::from_str(path)?.canonicalize()?; - let blobs = node - .get_protocol::>(iroh_blobs::ALPN) - .unwrap() - .client(); - println!("Analyzing file."); let blob = blobs @@ -57,11 +53,6 @@ async fn main() -> Result<()> { let path_buf = PathBuf::from_str(path)?; let ticket = BlobTicket::from_str(ticket)?; - let blobs = node - .get_protocol::>(iroh_blobs::ALPN) - .unwrap() - .client(); - println!("Starting download."); blobs From 68d0b94d107b620ca214eb37c6a13905d1cc7c38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 29 Nov 2024 12:08:11 +0100 Subject: [PATCH 3/3] Mark `downloader::test::cancellation` as flaky --- src/downloader/test.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/downloader/test.rs b/src/downloader/test.rs index 12659efb0..a444dca4b 100644 --- a/src/downloader/test.rs +++ b/src/downloader/test.rs @@ -121,6 +121,7 @@ async fn deduplication() { } /// Tests that the request is cancelled only when all intents are cancelled. +#[ignore = "flaky"] #[tokio::test] async fn cancellation() { let _guard = iroh_test::logging::setup();