|
| 1 | +//! Example that runs and iroh node with local node discovery and no relay server |
| 2 | +//! |
| 3 | +//! Run the follow command to run the "accept" side, that hosts the content: |
| 4 | +//! $ cargo run --example discovery_local_network --features="discovery-local-network" -- accept [FILE_PATH] |
| 5 | +//! Wait for output that looks like the following: |
| 6 | +//! $ cargo run --example discovery_local_network --features="discovery-local-network" -- connect [NODE_ID] [HASH] -o [FILE_PATH] |
| 7 | +//! Run that command on another machine in the same local network, replacing [FILE_PATH] to the path on which you want to save the transferred content. |
| 8 | +use std::path::PathBuf; |
| 9 | + |
| 10 | +use anyhow::ensure; |
| 11 | +use clap::{Parser, Subcommand}; |
| 12 | +use iroh::{ |
| 13 | + discovery::mdns::MdnsDiscovery, protocol::Router, Endpoint, NodeAddr, PublicKey, RelayMode, |
| 14 | + SecretKey, |
| 15 | +}; |
| 16 | +use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::WrapOption, Hash}; |
| 17 | +use tracing_subscriber::{prelude::*, EnvFilter}; |
| 18 | + |
| 19 | +use self::progress::show_download_progress; |
| 20 | + |
| 21 | +// set the RUST_LOG env var to one of {debug,info,warn} to see logging info |
| 22 | +pub fn setup_logging() { |
| 23 | + tracing_subscriber::registry() |
| 24 | + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) |
| 25 | + .with(EnvFilter::from_default_env()) |
| 26 | + .try_init() |
| 27 | + .ok(); |
| 28 | +} |
| 29 | + |
| 30 | +#[derive(Debug, Parser)] |
| 31 | +#[command(version, about)] |
| 32 | +pub struct Cli { |
| 33 | + #[clap(subcommand)] |
| 34 | + command: Commands, |
| 35 | +} |
| 36 | + |
| 37 | +#[derive(Subcommand, Clone, Debug)] |
| 38 | +pub enum Commands { |
| 39 | + /// Launch an iroh node and provide the content at the given path |
| 40 | + Accept { |
| 41 | + /// path to the file you want to provide |
| 42 | + path: PathBuf, |
| 43 | + }, |
| 44 | + /// Get the node_id and hash string from a node running accept in the local network |
| 45 | + /// Download the content from that node. |
| 46 | + Connect { |
| 47 | + /// Node ID of a node on the local network |
| 48 | + node_id: PublicKey, |
| 49 | + /// Hash of content you want to download from the node |
| 50 | + hash: Hash, |
| 51 | + /// save the content to a file |
| 52 | + #[clap(long, short)] |
| 53 | + out: Option<PathBuf>, |
| 54 | + }, |
| 55 | +} |
| 56 | + |
| 57 | +#[tokio::main] |
| 58 | +async fn main() -> anyhow::Result<()> { |
| 59 | + setup_logging(); |
| 60 | + let cli = Cli::parse(); |
| 61 | + |
| 62 | + let key = SecretKey::generate(rand::rngs::OsRng); |
| 63 | + let discovery = MdnsDiscovery::new(key.public())?; |
| 64 | + |
| 65 | + println!("Starting iroh node with mdns discovery..."); |
| 66 | + // create a new node |
| 67 | + let endpoint = Endpoint::builder() |
| 68 | + .secret_key(key) |
| 69 | + .discovery(Box::new(discovery)) |
| 70 | + .relay_mode(RelayMode::Disabled) |
| 71 | + .bind() |
| 72 | + .await?; |
| 73 | + let builder = Router::builder(endpoint); |
| 74 | + let blobs = Blobs::memory().build(builder.endpoint()); |
| 75 | + let builder = builder.accept(iroh_blobs::ALPN, blobs.clone()); |
| 76 | + let node = builder.spawn().await?; |
| 77 | + let blobs_client = blobs.client(); |
| 78 | + |
| 79 | + match &cli.command { |
| 80 | + Commands::Accept { path } => { |
| 81 | + if !path.is_file() { |
| 82 | + println!("Content must be a file."); |
| 83 | + node.shutdown().await?; |
| 84 | + return Ok(()); |
| 85 | + } |
| 86 | + let absolute = path.canonicalize()?; |
| 87 | + println!("Adding {} as {}...", path.display(), absolute.display()); |
| 88 | + let stream = blobs_client |
| 89 | + .add_from_path( |
| 90 | + absolute, |
| 91 | + true, |
| 92 | + iroh_blobs::util::SetTagOption::Auto, |
| 93 | + WrapOption::NoWrap, |
| 94 | + ) |
| 95 | + .await?; |
| 96 | + let outcome = stream.finish().await?; |
| 97 | + println!("To fetch the blob:\n\tcargo run --example discovery_local_network --features=\"discovery-local-network\" -- connect {} {} -o [FILE_PATH]", node.endpoint().node_id(), outcome.hash); |
| 98 | + tokio::signal::ctrl_c().await?; |
| 99 | + node.shutdown().await?; |
| 100 | + std::process::exit(0); |
| 101 | + } |
| 102 | + Commands::Connect { node_id, hash, out } => { |
| 103 | + println!("NodeID: {}", node.endpoint().node_id()); |
| 104 | + let mut stream = blobs_client |
| 105 | + .download(*hash, NodeAddr::new(*node_id)) |
| 106 | + .await?; |
| 107 | + show_download_progress(*hash, &mut stream).await?; |
| 108 | + if let Some(path) = out { |
| 109 | + let absolute = std::env::current_dir()?.join(path); |
| 110 | + ensure!(!absolute.is_dir(), "output must not be a directory"); |
| 111 | + tracing::info!( |
| 112 | + "exporting {hash} to {} -> {}", |
| 113 | + path.display(), |
| 114 | + absolute.display() |
| 115 | + ); |
| 116 | + let stream = blobs_client |
| 117 | + .export( |
| 118 | + *hash, |
| 119 | + absolute, |
| 120 | + iroh_blobs::store::ExportFormat::Blob, |
| 121 | + iroh_blobs::store::ExportMode::Copy, |
| 122 | + ) |
| 123 | + .await?; |
| 124 | + stream.await?; |
| 125 | + } |
| 126 | + } |
| 127 | + } |
| 128 | + Ok(()) |
| 129 | +} |
| 130 | + |
| 131 | +mod progress { |
| 132 | + use anyhow::{bail, Result}; |
| 133 | + use console::style; |
| 134 | + use futures_lite::{Stream, StreamExt}; |
| 135 | + use indicatif::{ |
| 136 | + HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState, |
| 137 | + ProgressStyle, |
| 138 | + }; |
| 139 | + use iroh_blobs::{ |
| 140 | + get::{db::DownloadProgress, progress::BlobProgress, Stats}, |
| 141 | + Hash, |
| 142 | + }; |
| 143 | + |
| 144 | + pub async fn show_download_progress( |
| 145 | + hash: Hash, |
| 146 | + mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin, |
| 147 | + ) -> Result<()> { |
| 148 | + eprintln!("Fetching: {}", hash); |
| 149 | + let mp = MultiProgress::new(); |
| 150 | + mp.set_draw_target(ProgressDrawTarget::stderr()); |
| 151 | + let op = mp.add(make_overall_progress()); |
| 152 | + let ip = mp.add(make_individual_progress()); |
| 153 | + op.set_message(format!("{} Connecting ...\n", style("[1/3]").bold().dim())); |
| 154 | + let mut seq = false; |
| 155 | + while let Some(x) = stream.next().await { |
| 156 | + match x? { |
| 157 | + DownloadProgress::InitialState(state) => { |
| 158 | + if state.connected { |
| 159 | + op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim())); |
| 160 | + } |
| 161 | + if let Some(count) = state.root.child_count { |
| 162 | + op.set_message(format!( |
| 163 | + "{} Downloading {} blob(s)\n", |
| 164 | + style("[3/3]").bold().dim(), |
| 165 | + count + 1, |
| 166 | + )); |
| 167 | + op.set_length(count + 1); |
| 168 | + op.reset(); |
| 169 | + op.set_position(state.current.map(u64::from).unwrap_or(0)); |
| 170 | + seq = true; |
| 171 | + } |
| 172 | + if let Some(blob) = state.get_current() { |
| 173 | + if let Some(size) = blob.size { |
| 174 | + ip.set_length(size.value()); |
| 175 | + ip.reset(); |
| 176 | + match blob.progress { |
| 177 | + BlobProgress::Pending => {} |
| 178 | + BlobProgress::Progressing(offset) => ip.set_position(offset), |
| 179 | + BlobProgress::Done => ip.finish_and_clear(), |
| 180 | + } |
| 181 | + if !seq { |
| 182 | + op.finish_and_clear(); |
| 183 | + } |
| 184 | + } |
| 185 | + } |
| 186 | + } |
| 187 | + DownloadProgress::FoundLocal { .. } => {} |
| 188 | + DownloadProgress::Connected => { |
| 189 | + op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim())); |
| 190 | + } |
| 191 | + DownloadProgress::FoundHashSeq { children, .. } => { |
| 192 | + op.set_message(format!( |
| 193 | + "{} Downloading {} blob(s)\n", |
| 194 | + style("[3/3]").bold().dim(), |
| 195 | + children + 1, |
| 196 | + )); |
| 197 | + op.set_length(children + 1); |
| 198 | + op.reset(); |
| 199 | + seq = true; |
| 200 | + } |
| 201 | + DownloadProgress::Found { size, child, .. } => { |
| 202 | + if seq { |
| 203 | + op.set_position(child.into()); |
| 204 | + } else { |
| 205 | + op.finish_and_clear(); |
| 206 | + } |
| 207 | + ip.set_length(size); |
| 208 | + ip.reset(); |
| 209 | + } |
| 210 | + DownloadProgress::Progress { offset, .. } => { |
| 211 | + ip.set_position(offset); |
| 212 | + } |
| 213 | + DownloadProgress::Done { .. } => { |
| 214 | + ip.finish_and_clear(); |
| 215 | + } |
| 216 | + DownloadProgress::AllDone(Stats { |
| 217 | + bytes_read, |
| 218 | + elapsed, |
| 219 | + .. |
| 220 | + }) => { |
| 221 | + op.finish_and_clear(); |
| 222 | + eprintln!( |
| 223 | + "Transferred {} in {}, {}/s", |
| 224 | + HumanBytes(bytes_read), |
| 225 | + HumanDuration(elapsed), |
| 226 | + HumanBytes((bytes_read as f64 / elapsed.as_secs_f64()) as u64) |
| 227 | + ); |
| 228 | + break; |
| 229 | + } |
| 230 | + DownloadProgress::Abort(e) => { |
| 231 | + bail!("download aborted: {}", e); |
| 232 | + } |
| 233 | + } |
| 234 | + } |
| 235 | + Ok(()) |
| 236 | + } |
| 237 | + fn make_overall_progress() -> ProgressBar { |
| 238 | + let pb = ProgressBar::hidden(); |
| 239 | + pb.enable_steady_tick(std::time::Duration::from_millis(100)); |
| 240 | + pb.set_style( |
| 241 | + ProgressStyle::with_template( |
| 242 | + "{msg}{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len}", |
| 243 | + ) |
| 244 | + .unwrap() |
| 245 | + .progress_chars("#>-"), |
| 246 | + ); |
| 247 | + pb |
| 248 | + } |
| 249 | + |
| 250 | + fn make_individual_progress() -> ProgressBar { |
| 251 | + let pb = ProgressBar::hidden(); |
| 252 | + pb.enable_steady_tick(std::time::Duration::from_millis(100)); |
| 253 | + pb.set_style( |
| 254 | + ProgressStyle::with_template("{msg}{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})") |
| 255 | + .unwrap() |
| 256 | + .with_key( |
| 257 | + "eta", |
| 258 | + |state: &ProgressState, w: &mut dyn std::fmt::Write| { |
| 259 | + write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap() |
| 260 | + }, |
| 261 | + ) |
| 262 | + .progress_chars("#>-"), |
| 263 | + ); |
| 264 | + pb |
| 265 | + } |
| 266 | +} |
0 commit comments