diff --git a/Cargo.lock b/Cargo.lock index 65ac03e73..ca1acf205 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -308,11 +308,42 @@ dependencies = [ "serde", ] +[[package]] +name = "camino" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cc" -version = "1.1.34" +version = "1.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b9470d453346108f93a59222a9a1a5724db32d0a4727b7ab7ace4b4d822dc9" +checksum = "0f57c4b4da2a9d619dd035f27316d7a426305b75be93d09e92f2b9229c34feaf" dependencies = [ "shlex", ] @@ -746,6 +777,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "educe" +version = "0.4.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f0042ff8246a363dbe77d2ceedb073339e85a804b9a47636c6e016a9a32c05f" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "elliptic-curve" version = "0.13.8" @@ -789,6 +832,19 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "enum-ordinalize" +version = "3.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bf1fa3f06bbff1ea5b1a9c7b14aa992a39657db60a2759457328d7e058f49ee" +dependencies = [ + "num-bigint", + "num-traits", + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "enumflags2" version = "0.7.10" @@ -1730,7 +1786,7 @@ checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" [[package]] name = "iroh-base" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#134a93b5a60103b3ce8fa4aacb52cdbcb291d00b" +source = "git+https://github.com/n0-computer/iroh?branch=main#0a7a534128bf1234a326fcfba134d878e796c377" dependencies = [ "aead", "anyhow", @@ -1791,17 +1847,22 @@ dependencies = [ "iroh-quinn", "iroh-router", "iroh-test", + "nested_enum_utils", "num_cpus", "oneshot", "parking_lot", "pin-project", + "portable-atomic", "postcard", "proptest", + "quic-rpc", + "quic-rpc-derive", "rand", "range-collections", "rcgen", "redb 1.5.1", "redb 2.2.0", + "ref-cast", "reflink-copy", "rustls", "self_cell", @@ -1810,7 +1871,9 @@ dependencies = [ "serde_json", "serde_test", "smallvec", + "strum", "tempfile", + "testdir", "testresult", "thiserror", "tokio", @@ -1818,6 +1881,7 @@ dependencies = [ "tracing", "tracing-futures", "tracing-subscriber", + "walkdir", ] [[package]] @@ -1836,7 +1900,7 @@ dependencies = [ [[package]] name = "iroh-metrics" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#134a93b5a60103b3ce8fa4aacb52cdbcb291d00b" +source = "git+https://github.com/n0-computer/iroh?branch=main#0a7a534128bf1234a326fcfba134d878e796c377" dependencies = [ "anyhow", "erased_set", @@ -1856,7 +1920,7 @@ dependencies = [ [[package]] name = "iroh-net" version = "0.28.1" -source = "git+https://github.com/n0-computer/iroh?branch=main#134a93b5a60103b3ce8fa4aacb52cdbcb291d00b" +source = "git+https://github.com/n0-computer/iroh?branch=main#0a7a534128bf1234a326fcfba134d878e796c377" dependencies = [ "anyhow", "backoff", @@ -1983,7 +2047,7 @@ dependencies = [ [[package]] name = "iroh-router" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#134a93b5a60103b3ce8fa4aacb52cdbcb291d00b" +source = "git+https://github.com/n0-computer/iroh?branch=main#0a7a534128bf1234a326fcfba134d878e796c377" dependencies = [ "anyhow", "futures-buffered", @@ -2226,6 +2290,18 @@ dependencies = [ "getrandom", ] +[[package]] +name = "nested_enum_utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f256ef99e7ac37428ef98c89bef9d84b590172de4bbfbe81b68a4cd3abadb32" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "netdev" version = "0.30.0" @@ -2311,7 +2387,7 @@ dependencies = [ [[package]] name = "netwatch" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#134a93b5a60103b3ce8fa4aacb52cdbcb291d00b" +source = "git+https://github.com/n0-computer/iroh?branch=main#0a7a534128bf1234a326fcfba134d878e796c377" dependencies = [ "anyhow", "bytes", @@ -2375,6 +2451,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2821,7 +2906,7 @@ checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" [[package]] name = "portmapper" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#134a93b5a60103b3ce8fa4aacb52cdbcb291d00b" +source = "git+https://github.com/n0-computer/iroh?branch=main#0a7a534128bf1234a326fcfba134d878e796c377" dependencies = [ "anyhow", "base64", @@ -3046,6 +3131,39 @@ dependencies = [ "winapi", ] +[[package]] +name = "quic-rpc" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61e131f594054d27d077162815db3b5e9ddd76a28fbb9091b68095971e75c286" +dependencies = [ + "anyhow", + "derive_more", + "educe", + "flume", + "futures-lite 2.4.0", + "futures-sink", + "futures-util", + "hex", + "pin-project", + "serde", + "slab", + "tokio", + "tracing", +] + +[[package]] +name = "quic-rpc-derive" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbef4c942978f74ef296ae40d43d4375c9d730b65a582688a358108cfd5c0cf7" +dependencies = [ + "proc-macro2", + "quic-rpc", + "quote", + "syn 1.0.109", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -3089,9 +3207,9 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e346e016eacfff12233c243718197ca12f148c84e1e84268a896699b41c71780" +checksum = "7d5a626c6807713b15cac82a6acaccd6043c9a5408c24baae07611fec3f243da" dependencies = [ "cfg_aliases", "libc", @@ -3448,9 +3566,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.38" +version = "0.38.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa260229e6538e52293eeb577aabd09945a09d6d9cc0fc550ed7529056c2e32a" +checksum = "375116bee2be9ed569afe2154ea6a99dfdffd257f533f187498c2a8f5feaf4ee" dependencies = [ "bitflags 2.6.0", "errno", @@ -3652,6 +3770,9 @@ name = "semver" version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -4079,6 +4200,20 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "sysinfo" +version = "0.26.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c18a6156d1f27a9592ee18c1a846ca8dd5c258b7179fc193ae87c74ebb666f5" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "winapi", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -4113,6 +4248,20 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "testdir" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee79e927b64d193f5abb60d20a0eb56be0ee5a242fdeb8ce3bf054177006de52" +dependencies = [ + "anyhow", + "backtrace", + "cargo_metadata", + "once_cell", + "sysinfo", + "whoami", +] + [[package]] name = "testresult" version = "0.4.1" @@ -4598,6 +4747,12 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.95" @@ -4696,6 +4851,17 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "whoami" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" +dependencies = [ + "redox_syscall", + "wasite", + "web-sys", +] + [[package]] name = "widestring" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index bf30b8c05..0d2753f49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ chrono = "0.4.31" derive_more = { version = "1.0.0", features = ["debug", "display", "deref", "deref_mut", "from", "try_into", "into"] } futures-buffered = "0.2.4" futures-lite = "2.3" +futures-util = { version = "0.3.30", optional = true } genawaiter = { version = "0.99.1", features = ["futures03"] } hashlink = { version = "0.9.0", optional = true } hex = "0.4.3" @@ -29,27 +30,34 @@ iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.28.0", default-features = false } iroh-net = { version = "0.28.1" } iroh-router = "0.28.0" +nested_enum_utils = { version = "0.1.0", optional = true } num_cpus = "1.15.0" oneshot = "0.1.8" parking_lot = { version = "0.12.1", optional = true } pin-project = "1.1.5" +portable-atomic = { version = "1", optional = true } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } +quic-rpc = { version = "0.15.0", optional = true } +quic-rpc-derive = { version = "0.15.0", optional = true } quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] } rand = "0.8" range-collections = "0.4.0" redb = { version = "2.0.0", optional = true } redb_v1 = { package = "redb", version = "1.5.1", optional = true } +ref-cast = { version = "1.0.23", optional = true } reflink-copy = { version = "0.1.8", optional = true } self_cell = "1.0.1" serde = { version = "1", features = ["derive"] } serde-error = "0.1.3" smallvec = { version = "1.10.0", features = ["serde", "const_new"] } +strum = { version = "0.26.3", optional = true } tempfile = { version = "3.10.0", optional = true } thiserror = "1" tokio = { version = "1", features = ["fs"] } tokio-util = { version = "0.7", features = ["io-util", "io"] } tracing = "0.1" tracing-futures = "0.2.5" +walkdir = { version = "2.5.0", optional = true } [dev-dependencies] http-body = "0.4.5" @@ -65,13 +73,15 @@ rcgen = "0.12.0" rustls = { version = "0.23", default-features = false, features = ["ring"] } tempfile = "3.10.0" futures-util = "0.3.30" +testdir = "0.9.1" [features] -default = ["fs-store"] +default = ["fs-store", "rpc"] downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"] fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"] metrics = ["iroh-metrics/metrics"] redb = ["dep:redb"] +rpc = ["dep:quic-rpc", "dep:quic-rpc-derive", "dep:nested_enum_utils", "dep:strum", "dep:futures-util", "dep:ref-cast", "dep:portable-atomic", "dep:walkdir", "downloader"] [package.metadata.docs.rs] all-features = true diff --git a/deny.toml b/deny.toml index 267ca7a65..f5669dbf3 100644 --- a/deny.toml +++ b/deny.toml @@ -21,6 +21,7 @@ allow = [ "Unicode-DFS-2016", "Zlib", "MPL-2.0", # https://fossa.com/blog/open-source-software-licenses-101-mozilla-public-license-2-0/ + "Unicode-3.0" ] [[licenses.clarify]] diff --git a/src/lib.rs b/src/lib.rs index b6358fabc..8b561c504 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,6 +40,9 @@ pub mod metrics; pub mod net_protocol; pub mod protocol; pub mod provider; +#[cfg(feature = "rpc")] +#[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))] +pub mod rpc; pub mod store; pub mod util; diff --git a/src/net_protocol.rs b/src/net_protocol.rs index d1de1da65..917dc2f07 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -73,6 +73,7 @@ pub struct Blobs { events: EventSender, downloader: Downloader, batches: tokio::sync::Mutex, + endpoint: Endpoint, } /// Name used for logging when new node addresses are added from gossip. @@ -135,12 +136,14 @@ impl Blobs { rt: LocalPoolHandle, events: EventSender, downloader: Downloader, + endpoint: Endpoint, ) -> Self { Self { rt, store, events, downloader, + endpoint, batches: Default::default(), } } @@ -149,6 +152,14 @@ impl Blobs { &self.store } + pub(crate) fn rt(&self) -> LocalPoolHandle { + self.rt.clone() + } + + pub(crate) fn endpoint(&self) -> &Endpoint { + &self.endpoint + } + pub async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> { self.batches.lock().await } diff --git a/src/rpc.rs b/src/rpc.rs new file mode 100644 index 000000000..7264f8ac5 --- /dev/null +++ b/src/rpc.rs @@ -0,0 +1,871 @@ +//! Provides a rpc protocol as well as a client for the protocol + +use std::{ + io, + sync::{Arc, Mutex}, +}; + +use anyhow::anyhow; +use client::{ + blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption}, + tags::TagInfo, +}; +use futures_buffered::BufferedStreamExt; +use futures_lite::StreamExt; +use futures_util::{FutureExt, Stream}; +use genawaiter::sync::{Co, Gen}; +use iroh_base::hash::{BlobFormat, HashAndFormat}; +use iroh_io::AsyncSliceReader; +use proto::{ + blobs::{ + AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate, + BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse, + BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, + BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest, + CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadResponse, + ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest, + ReadAtResponse, ValidateRequest, + }, + tags::{ + CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest, + ListRequest as TagListRequest, SetRequest as TagsSetRequest, SyncMode, + }, + Request, RpcError, RpcResult, RpcService, +}; +use quic_rpc::server::{ChannelTypes, RpcChannel, RpcServerError}; + +use crate::{ + export::ExportProgress, + format::collection::Collection, + get::db::DownloadProgress, + net_protocol::{BlobDownloadRequest, Blobs}, + provider::{AddProgress, BatchAddPathProgress}, + store::{ConsistencyCheckProgress, ImportProgress, MapEntry, ValidateProgress}, + util::{ + progress::{AsyncChannelProgressSender, ProgressSender}, + SetTagOption, + }, + Tag, +}; +pub mod client; +pub mod proto; + +/// Chunk size for getting blobs over RPC +const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64; +/// Channel cap for getting blobs over RPC +const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; + +impl Blobs { + /// Handle an RPC request + pub async fn handle_rpc_request( + self: Arc, + msg: Request, + chan: RpcChannel, + ) -> std::result::Result<(), RpcServerError> + where + C: ChannelTypes, + { + use Request::*; + match msg { + Blobs(msg) => self.handle_blobs_request(msg, chan).await, + Tags(msg) => self.handle_tags_request(msg, chan).await, + } + } + + /// Handle a tags request + pub async fn handle_tags_request( + self: Arc, + msg: proto::tags::Request, + chan: RpcChannel, + ) -> std::result::Result<(), RpcServerError> + where + C: ChannelTypes, + { + use proto::tags::Request::*; + match msg { + Create(msg) => chan.rpc(msg, self, Self::tags_create).await, + Set(msg) => chan.rpc(msg, self, Self::tags_set).await, + DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await, + ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await, + } + } + + /// Handle a blobs request + pub async fn handle_blobs_request( + self: Arc, + msg: proto::blobs::Request, + chan: RpcChannel, + ) -> std::result::Result<(), RpcServerError> + where + C: ChannelTypes, + { + use proto::blobs::Request::*; + match msg { + List(msg) => chan.server_streaming(msg, self, Self::blob_list).await, + ListIncomplete(msg) => { + chan.server_streaming(msg, self, Self::blob_list_incomplete) + .await + } + CreateCollection(msg) => chan.rpc(msg, self, Self::create_collection).await, + Delete(msg) => chan.rpc(msg, self, Self::blob_delete_blob).await, + AddPath(msg) => { + chan.server_streaming(msg, self, Self::blob_add_from_path) + .await + } + Download(msg) => chan.server_streaming(msg, self, Self::blob_download).await, + Export(msg) => chan.server_streaming(msg, self, Self::blob_export).await, + Validate(msg) => chan.server_streaming(msg, self, Self::blob_validate).await, + Fsck(msg) => { + chan.server_streaming(msg, self, Self::blob_consistency_check) + .await + } + ReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await, + AddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await, + AddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + BlobStatus(msg) => chan.rpc(msg, self, Self::blob_status).await, + BatchCreate(msg) => chan.bidi_streaming(msg, self, Self::batch_create).await, + BatchUpdate(_) => Err(RpcServerError::UnexpectedStartMessage), + BatchAddStream(msg) => chan.bidi_streaming(msg, self, Self::batch_add_stream).await, + BatchAddStreamUpdate(_) => Err(RpcServerError::UnexpectedStartMessage), + BatchAddPath(msg) => { + chan.server_streaming(msg, self, Self::batch_add_from_path) + .await + } + BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await, + } + } + + async fn blob_status(self: Arc, msg: BlobStatusRequest) -> RpcResult { + let blobs = self; + let entry = blobs + .store() + .get(&msg.hash) + .await + .map_err(|e| RpcError::new(&e))?; + Ok(BlobStatusResponse(match entry { + Some(entry) => { + if entry.is_complete() { + BlobStatus::Complete { + size: entry.size().value(), + } + } else { + BlobStatus::Partial { size: entry.size() } + } + } + None => BlobStatus::NotFound, + })) + } + + async fn blob_list_impl(self: Arc, co: &Co>) -> io::Result<()> { + use bao_tree::io::fsm::Outboard; + + let blobs = self; + let db = blobs.store(); + for blob in db.blobs().await? { + let blob = blob?; + let Some(entry) = db.get(&blob).await? else { + continue; + }; + let hash = entry.hash(); + let size = entry.outboard().await?.tree().size(); + let path = "".to_owned(); + co.yield_(Ok(BlobInfo { hash, size, path })).await; + } + Ok(()) + } + + async fn blob_list_incomplete_impl( + self: Arc, + co: &Co>, + ) -> io::Result<()> { + let blobs = self; + let db = blobs.store(); + for hash in db.partial_blobs().await? { + let hash = hash?; + let Ok(Some(entry)) = db.get_mut(&hash).await else { + continue; + }; + if entry.is_complete() { + continue; + } + let size = 0; + let expected_size = entry.size().value(); + co.yield_(Ok(IncompleteBlobInfo { + hash, + size, + expected_size, + })) + .await; + } + Ok(()) + } + + fn blob_list( + self: Arc, + _msg: ListRequest, + ) -> impl Stream> + Send + 'static { + Gen::new(|co| async move { + if let Err(e) = self.blob_list_impl(&co).await { + co.yield_(Err(RpcError::new(&e))).await; + } + }) + } + + fn blob_list_incomplete( + self: Arc, + _msg: ListIncompleteRequest, + ) -> impl Stream> + Send + 'static { + Gen::new(move |co| async move { + if let Err(e) = self.blob_list_incomplete_impl(&co).await { + co.yield_(Err(RpcError::new(&e))).await; + } + }) + } + + async fn blob_delete_tag(self: Arc, msg: TagDeleteRequest) -> RpcResult<()> { + self.store() + .set_tag(msg.name, None) + .await + .map_err(|e| RpcError::new(&e))?; + Ok(()) + } + + async fn blob_delete_blob(self: Arc, msg: DeleteRequest) -> RpcResult<()> { + self.store() + .delete(vec![msg.hash]) + .await + .map_err(|e| RpcError::new(&e))?; + Ok(()) + } + + fn blob_list_tags( + self: Arc, + msg: TagListRequest, + ) -> impl Stream + Send + 'static { + tracing::info!("blob_list_tags"); + let blobs = self; + Gen::new(|co| async move { + let tags = blobs.store().tags().await.unwrap(); + #[allow(clippy::manual_flatten)] + for item in tags { + if let Ok((name, HashAndFormat { hash, format })) = item { + if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) { + co.yield_(TagInfo { name, hash, format }).await; + } + } + } + }) + } + + /// Invoke validate on the database and stream out the result + fn blob_validate( + self: Arc, + msg: ValidateRequest, + ) -> impl Stream + Send + 'static { + let (tx, rx) = async_channel::bounded(1); + let tx2 = tx.clone(); + let blobs = self; + tokio::task::spawn(async move { + if let Err(e) = blobs + .store() + .validate(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) + .await + { + tx2.send(ValidateProgress::Abort(RpcError::new(&e))) + .await + .ok(); + } + }); + rx + } + + /// Invoke validate on the database and stream out the result + fn blob_consistency_check( + self: Arc, + msg: ConsistencyCheckRequest, + ) -> impl Stream + Send + 'static { + let (tx, rx) = async_channel::bounded(1); + let tx2 = tx.clone(); + let blobs = self; + tokio::task::spawn(async move { + if let Err(e) = blobs + .store() + .consistency_check(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) + .await + { + tx2.send(ConsistencyCheckProgress::Abort(RpcError::new(&e))) + .await + .ok(); + } + }); + rx + } + + fn blob_add_from_path( + self: Arc, + msg: AddPathRequest, + ) -> impl Stream { + // provide a little buffer so that we don't slow down the sender + let (tx, rx) = async_channel::bounded(32); + let tx2 = tx.clone(); + self.rt().spawn_detached(|| async move { + if let Err(e) = self.blob_add_from_path0(msg, tx).await { + tx2.send(AddProgress::Abort(RpcError::new(&*e))).await.ok(); + } + }); + rx.map(AddPathResponse) + } + + async fn tags_set(self: Arc, msg: TagsSetRequest) -> RpcResult<()> { + let blobs = self; + blobs + .store() + .set_tag(msg.name, msg.value) + .await + .map_err(|e| RpcError::new(&e))?; + if let SyncMode::Full = msg.sync { + blobs.store().sync().await.map_err(|e| RpcError::new(&e))?; + } + if let Some(batch) = msg.batch { + if let Some(content) = msg.value.as_ref() { + blobs + .batches() + .await + .remove_one(batch, content) + .map_err(|e| RpcError::new(&*e))?; + } + } + Ok(()) + } + + async fn tags_create(self: Arc, msg: TagsCreateRequest) -> RpcResult { + let blobs = self; + let tag = blobs + .store() + .create_tag(msg.value) + .await + .map_err(|e| RpcError::new(&e))?; + if let SyncMode::Full = msg.sync { + blobs.store().sync().await.map_err(|e| RpcError::new(&e))?; + } + if let Some(batch) = msg.batch { + blobs + .batches() + .await + .remove_one(batch, &msg.value) + .map_err(|e| RpcError::new(&*e))?; + } + Ok(tag) + } + + fn blob_download( + self: Arc, + msg: BlobDownloadRequest, + ) -> impl Stream { + let (sender, receiver) = async_channel::bounded(1024); + let endpoint = self.endpoint().clone(); + let progress = AsyncChannelProgressSender::new(sender); + + let blobs_protocol = self.clone(); + + self.rt().spawn_detached(move || async move { + if let Err(err) = blobs_protocol + .download(endpoint, msg, progress.clone()) + .await + { + progress + .send(DownloadProgress::Abort(RpcError::new(&*err))) + .await + .ok(); + } + }); + + receiver.map(DownloadResponse) + } + + fn blob_export(self: Arc, msg: ExportRequest) -> impl Stream { + let (tx, rx) = async_channel::bounded(1024); + let progress = AsyncChannelProgressSender::new(tx); + self.rt().spawn_detached(move || async move { + let res = crate::export::export( + self.store(), + msg.hash, + msg.path, + msg.format, + msg.mode, + progress.clone(), + ) + .await; + match res { + Ok(()) => progress.send(ExportProgress::AllDone).await.ok(), + Err(err) => progress + .send(ExportProgress::Abort(RpcError::new(&*err))) + .await + .ok(), + }; + }); + rx.map(ExportResponse) + } + + async fn blob_add_from_path0( + self: Arc, + msg: AddPathRequest, + progress: async_channel::Sender, + ) -> anyhow::Result<()> { + use std::collections::BTreeMap; + + use crate::store::ImportMode; + + let blobs = self.clone(); + let progress = AsyncChannelProgressSender::new(progress); + let names = Arc::new(Mutex::new(BTreeMap::new())); + // convert import progress to provide progress + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::Found { id, name } => { + names.lock().unwrap().insert(id, name); + None + } + ImportProgress::Size { id, size } => { + let name = names.lock().unwrap().remove(&id)?; + Some(AddProgress::Found { id, name, size }) + } + ImportProgress::OutboardProgress { id, offset } => { + Some(AddProgress::Progress { id, offset }) + } + ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }), + _ => None, + }); + let AddPathRequest { + wrap, + path: root, + in_place, + tag, + } = msg; + // Check that the path is absolute and exists. + anyhow::ensure!(root.is_absolute(), "path must be absolute"); + anyhow::ensure!( + root.exists(), + "trying to add missing path: {}", + root.display() + ); + + let import_mode = match in_place { + true => ImportMode::TryReference, + false => ImportMode::Copy, + }; + + let create_collection = match wrap { + WrapOption::Wrap { .. } => true, + WrapOption::NoWrap => root.is_dir(), + }; + + let temp_tag = if create_collection { + // import all files below root recursively + let data_sources = crate::util::fs::scan_path(root, wrap)?; + let blobs = self; + + const IO_PARALLELISM: usize = 4; + let result: Vec<_> = futures_lite::stream::iter(data_sources) + .map(|source| { + let import_progress = import_progress.clone(); + let blobs = blobs.clone(); + async move { + let name = source.name().to_string(); + let (tag, size) = blobs + .store() + .import_file( + source.path().to_owned(), + import_mode, + BlobFormat::Raw, + import_progress, + ) + .await?; + let hash = *tag.hash(); + io::Result::Ok((name, hash, size, tag)) + } + }) + .buffered_ordered(IO_PARALLELISM) + .try_collect() + .await?; + + // create a collection + let (collection, _child_tags): (Collection, Vec<_>) = result + .into_iter() + .map(|(name, hash, _, tag)| ((name, hash), tag)) + .unzip(); + + collection.store(blobs.store()).await? + } else { + // import a single file + let (tag, _size) = blobs + .store() + .import_file(root, import_mode, BlobFormat::Raw, import_progress) + .await?; + tag + }; + + let hash_and_format = temp_tag.inner(); + let HashAndFormat { hash, format } = *hash_and_format; + let tag = match tag { + SetTagOption::Named(tag) => { + blobs + .store() + .set_tag(tag.clone(), Some(*hash_and_format)) + .await?; + tag + } + SetTagOption::Auto => blobs.store().create_tag(*hash_and_format).await?, + }; + progress + .send(AddProgress::AllDone { + hash, + format, + tag: tag.clone(), + }) + .await?; + Ok(()) + } + + async fn batch_create_temp_tag( + self: Arc, + msg: BatchCreateTempTagRequest, + ) -> RpcResult<()> { + let blobs = self; + let tag = blobs.store().temp_tag(msg.content); + blobs.batches().await.store(msg.batch, tag); + Ok(()) + } + + fn batch_add_stream( + self: Arc, + msg: BatchAddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + ) -> impl Stream { + let (tx, rx) = async_channel::bounded(32); + let this = self.clone(); + + self.rt().spawn_detached(|| async move { + if let Err(err) = this.batch_add_stream0(msg, stream, tx.clone()).await { + tx.send(BatchAddStreamResponse::Abort(RpcError::new(&*err))) + .await + .ok(); + } + }); + rx + } + + fn batch_add_from_path( + self: Arc, + msg: BatchAddPathRequest, + ) -> impl Stream { + // provide a little buffer so that we don't slow down the sender + let (tx, rx) = async_channel::bounded(32); + let tx2 = tx.clone(); + let this = self.clone(); + self.rt().spawn_detached(|| async move { + if let Err(e) = this.batch_add_from_path0(msg, tx).await { + tx2.send(BatchAddPathProgress::Abort(RpcError::new(&*e))) + .await + .ok(); + } + }); + rx.map(BatchAddPathResponse) + } + + async fn batch_add_stream0( + self: Arc, + msg: BatchAddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + progress: async_channel::Sender, + ) -> anyhow::Result<()> { + let blobs = self; + let progress = AsyncChannelProgressSender::new(progress); + + let stream = stream.map(|item| match item { + BatchAddStreamUpdate::Chunk(chunk) => Ok(chunk), + BatchAddStreamUpdate::Abort => { + Err(io::Error::new(io::ErrorKind::Interrupted, "Remote abort")) + } + }); + + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddStreamResponse::OutboardProgress { offset }) + } + _ => None, + }); + let (temp_tag, _len) = blobs + .store() + .import_stream(stream, msg.format, import_progress) + .await?; + let hash = temp_tag.inner().hash; + blobs.batches().await.store(msg.batch, temp_tag); + progress + .send(BatchAddStreamResponse::Result { hash }) + .await?; + Ok(()) + } + + async fn batch_add_from_path0( + self: Arc, + msg: BatchAddPathRequest, + progress: async_channel::Sender, + ) -> anyhow::Result<()> { + let progress = AsyncChannelProgressSender::new(progress); + // convert import progress to provide progress + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::Size { size, .. } => Some(BatchAddPathProgress::Found { size }), + ImportProgress::OutboardProgress { offset, .. } => { + Some(BatchAddPathProgress::Progress { offset }) + } + ImportProgress::OutboardDone { hash, .. } => Some(BatchAddPathProgress::Done { hash }), + _ => None, + }); + let BatchAddPathRequest { + path: root, + import_mode, + format, + batch, + } = msg; + // Check that the path is absolute and exists. + anyhow::ensure!(root.is_absolute(), "path must be absolute"); + anyhow::ensure!( + root.exists(), + "trying to add missing path: {}", + root.display() + ); + let blobs = self; + let (tag, _) = blobs + .store() + .import_file(root, import_mode, format, import_progress) + .await?; + let hash = *tag.hash(); + blobs.batches().await.store(batch, tag); + + progress.send(BatchAddPathProgress::Done { hash }).await?; + Ok(()) + } + + fn blob_add_stream( + self: Arc, + msg: AddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + ) -> impl Stream { + let (tx, rx) = async_channel::bounded(32); + let this = self.clone(); + + self.rt().spawn_detached(|| async move { + if let Err(err) = this.blob_add_stream0(msg, stream, tx.clone()).await { + tx.send(AddProgress::Abort(RpcError::new(&*err))).await.ok(); + } + }); + + rx.map(AddStreamResponse) + } + + async fn blob_add_stream0( + self: Arc, + msg: AddStreamRequest, + stream: impl Stream + Send + Unpin + 'static, + progress: async_channel::Sender, + ) -> anyhow::Result<()> { + let progress = AsyncChannelProgressSender::new(progress); + + let stream = stream.map(|item| match item { + AddStreamUpdate::Chunk(chunk) => Ok(chunk), + AddStreamUpdate::Abort => { + Err(io::Error::new(io::ErrorKind::Interrupted, "Remote abort")) + } + }); + + let name_cache = Arc::new(Mutex::new(None)); + let import_progress = progress.clone().with_filter_map(move |x| match x { + ImportProgress::Found { id: _, name } => { + let _ = name_cache.lock().unwrap().insert(name); + None + } + ImportProgress::Size { id, size } => { + let name = name_cache.lock().unwrap().take()?; + Some(AddProgress::Found { id, name, size }) + } + ImportProgress::OutboardProgress { id, offset } => { + Some(AddProgress::Progress { id, offset }) + } + ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }), + _ => None, + }); + let blobs = self; + let (temp_tag, _len) = blobs + .store() + .import_stream(stream, BlobFormat::Raw, import_progress) + .await?; + let hash_and_format = *temp_tag.inner(); + let HashAndFormat { hash, format } = hash_and_format; + let tag = match msg.tag { + SetTagOption::Named(tag) => { + blobs + .store() + .set_tag(tag.clone(), Some(hash_and_format)) + .await?; + tag + } + SetTagOption::Auto => blobs.store().create_tag(hash_and_format).await?, + }; + progress + .send(AddProgress::AllDone { hash, tag, format }) + .await?; + Ok(()) + } + + fn blob_read_at( + self: Arc, + req: ReadAtRequest, + ) -> impl Stream> + Send + 'static { + let (tx, rx) = async_channel::bounded(RPC_BLOB_GET_CHANNEL_CAP); + let db = self.store().clone(); + self.rt().spawn_detached(move || async move { + if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { + tx.send(RpcResult::Err(RpcError::new(&*err))).await.ok(); + } + }); + + async fn read_loop( + req: ReadAtRequest, + db: D, + tx: async_channel::Sender>, + max_chunk_size: usize, + ) -> anyhow::Result<()> { + let entry = db.get(&req.hash).await?; + let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?; + let size = entry.size(); + + anyhow::ensure!( + req.offset <= size.value(), + "requested offset is out of range: {} > {:?}", + req.offset, + size + ); + + let len: usize = req + .len + .as_result_len(size.value() - req.offset) + .try_into()?; + + anyhow::ensure!( + req.offset + len as u64 <= size.value(), + "requested range is out of bounds: offset: {}, len: {} > {:?}", + req.offset, + len, + size + ); + + tx.send(Ok(ReadAtResponse::Entry { + size, + is_complete: entry.is_complete(), + })) + .await?; + let mut reader = entry.data_reader().await?; + + let (num_chunks, chunk_size) = if len <= max_chunk_size { + (1, len) + } else { + let num_chunks = len / max_chunk_size + (len % max_chunk_size != 0) as usize; + (num_chunks, max_chunk_size) + }; + + let mut read = 0u64; + for i in 0..num_chunks { + let chunk_size = if i == num_chunks - 1 { + // last chunk might be smaller + len - read as usize + } else { + chunk_size + }; + let chunk = reader.read_at(req.offset + read, chunk_size).await?; + let chunk_len = chunk.len(); + if !chunk.is_empty() { + tx.send(Ok(ReadAtResponse::Data { chunk })).await?; + } + if chunk_len < chunk_size { + break; + } else { + read += chunk_len as u64; + } + } + Ok(()) + } + + rx + } + + fn batch_create( + self: Arc, + _: BatchCreateRequest, + mut updates: impl Stream + Send + Unpin + 'static, + ) -> impl Stream { + let blobs = self; + async move { + let batch = blobs.batches().await.create(); + tokio::spawn(async move { + while let Some(item) = updates.next().await { + match item { + BatchUpdate::Drop(content) => { + // this can not fail, since we keep the batch alive. + // therefore it is safe to ignore the result. + let _ = blobs.batches().await.remove_one(batch, &content); + } + BatchUpdate::Ping => {} + } + } + blobs.batches().await.remove(batch); + }); + BatchCreateResponse::Id(batch) + } + .into_stream() + } + + async fn create_collection( + self: Arc, + req: CreateCollectionRequest, + ) -> RpcResult { + let CreateCollectionRequest { + collection, + tag, + tags_to_delete, + } = req; + + let blobs = self; + + let temp_tag = collection + .store(blobs.store()) + .await + .map_err(|e| RpcError::new(&*e))?; + let hash_and_format = temp_tag.inner(); + let HashAndFormat { hash, .. } = *hash_and_format; + let tag = match tag { + SetTagOption::Named(tag) => { + blobs + .store() + .set_tag(tag.clone(), Some(*hash_and_format)) + .await + .map_err(|e| RpcError::new(&e))?; + tag + } + SetTagOption::Auto => blobs + .store() + .create_tag(*hash_and_format) + .await + .map_err(|e| RpcError::new(&e))?, + }; + + for tag in tags_to_delete { + blobs + .store() + .set_tag(tag, None) + .await + .map_err(|e| RpcError::new(&e))?; + } + + Ok(CreateCollectionResponse { hash, tag }) + } +} diff --git a/src/rpc/client.rs b/src/rpc/client.rs new file mode 100644 index 000000000..4b11fdc19 --- /dev/null +++ b/src/rpc/client.rs @@ -0,0 +1,20 @@ +//! Iroh blobs and tags client +use anyhow::Result; +use futures_util::{Stream, StreamExt}; + +pub mod blobs; +pub mod tags; + +fn flatten( + s: impl Stream, E2>>, +) -> impl Stream> +where + E1: std::error::Error + Send + Sync + 'static, + E2: std::error::Error + Send + Sync + 'static, +{ + s.map(|res| match res { + Ok(Ok(res)) => Ok(res), + Ok(Err(err)) => Err(err.into()), + Err(err) => Err(err.into()), + }) +} diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs new file mode 100644 index 000000000..1f7ce7646 --- /dev/null +++ b/src/rpc/client/blobs.rs @@ -0,0 +1,1795 @@ +//! API for blobs management. +//! +//! The main entry point is the [`Client`]. +//! +//! ## Interacting with the local blob store +//! +//! ### Importing data +//! +//! There are several ways to import data into the local blob store: +//! +//! - [`add_bytes`](Client::add_bytes) +//! imports in memory data. +//! - [`add_stream`](Client::add_stream) +//! imports data from a stream of bytes. +//! - [`add_reader`](Client::add_reader) +//! imports data from an [async reader](tokio::io::AsyncRead). +//! - [`add_from_path`](Client::add_from_path) +//! imports data from a file. +//! +//! The last method imports data from a file on the local filesystem. +//! This is the most efficient way to import large amounts of data. +//! +//! ### Exporting data +//! +//! There are several ways to export data from the local blob store: +//! +//! - [`read_to_bytes`](Client::read_to_bytes) reads data into memory. +//! - [`read`](Client::read) creates a [reader](Reader) to read data from. +//! - [`export`](Client::export) eports data to a file on the local filesystem. +//! +//! ## Interacting with remote nodes +//! +//! - [`download`](Client::download) downloads data from a remote node. +//! remote node. +//! +//! ## Interacting with the blob store itself +//! +//! These are more advanced operations that are usually not needed in normal +//! operation. +//! +//! - [`consistency_check`](Client::consistency_check) checks the internal +//! consistency of the local blob store. +//! - [`validate`](Client::validate) validates the locally stored data against +//! their BLAKE3 hashes. +//! - [`delete_blob`](Client::delete_blob) deletes a blob from the local store. +//! +//! ### Batch operations +//! +//! For complex update operations, there is a [`batch`](Client::batch) API that +//! allows you to add multiple blobs in a single logical batch. +//! +//! Operations in a batch return [temporary tags](crate::util::TempTag) that +//! protect the added data from garbage collection as long as the batch is +//! alive. +//! +//! To store the data permanently, a temp tag needs to be upgraded to a +//! permanent tag using [`persist`](crate::rpc::client::blobs::Batch::persist) or +//! [`persist_to`](crate::rpc::client::blobs::Batch::persist_to). +use std::{ + future::Future, + io, + path::PathBuf, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use anyhow::{anyhow, Context as _, Result}; +use bytes::Bytes; +use futures_lite::{Stream, StreamExt}; +use futures_util::SinkExt; +use genawaiter::sync::{Co, Gen}; +use iroh_net::NodeAddr; +use portable_atomic::{AtomicU64, Ordering}; +use quic_rpc::{ + client::{BoxStreamSync, BoxedConnector}, + Connector, RpcClient, +}; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; +use tokio_util::io::{ReaderStream, StreamReader}; +use tracing::warn; + +pub use crate::net_protocol::DownloadMode; +use crate::{ + export::ExportProgress as BytesExportProgress, + format::collection::{Collection, SimpleStore}, + get::db::DownloadProgress as BytesDownloadProgress, + net_protocol::BlobDownloadRequest, + rpc::proto::RpcService, + store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, + util::SetTagOption, + BlobFormat, Hash, Tag, +}; + +mod batch; +pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch}; + +use super::{flatten, tags}; +use crate::rpc::proto::blobs::{ + AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse, + BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, + DeleteRequest, ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest, + ReadAtResponse, ValidateRequest, +}; + +/// Iroh blobs client. +#[derive(Debug, Clone)] +pub struct Client> { + pub(super) rpc: RpcClient, +} + +impl Client +where + C: Connector, +{ + /// Create a new client + pub fn new(rpc: RpcClient) -> Self { + Self { rpc } + } + + /// Check if a blob is completely stored on the node. + /// + /// Note that this will return false for blobs that are partially stored on + /// the node. + pub async fn status(&self, hash: Hash) -> Result { + let status = self.rpc.rpc(BlobStatusRequest { hash }).await??; + Ok(status.0) + } + + /// Check if a blob is completely stored on the node. + /// + /// This is just a convenience wrapper around `status` that returns a boolean. + pub async fn has(&self, hash: Hash) -> Result { + match self.status(hash).await { + Ok(BlobStatus::Complete { .. }) => Ok(true), + Ok(_) => Ok(false), + Err(err) => Err(err), + } + } + + /// Create a new batch for adding data. + /// + /// A batch is a context in which temp tags are created and data is added to the node. Temp tags + /// are automatically deleted when the batch is dropped, leading to the data being garbage collected + /// unless a permanent tag is created for it. + pub async fn batch(&self) -> Result> { + let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?; + let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??; + let rpc = self.rpc.clone(); + Ok(Batch::new(batch, rpc, updates, 1024)) + } + + /// Stream the contents of a a single blob. + /// + /// Returns a [`Reader`], which can report the size of the blob before reading it. + pub async fn read(&self, hash: Hash) -> Result { + Reader::from_rpc_read(&self.rpc, hash).await + } + + /// Read offset + len from a single blob. + /// + /// If `len` is `None` it will read the full blob. + pub async fn read_at(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result { + Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await + } + + /// Read all bytes of single blob. + /// + /// This allocates a buffer for the full blob. Use only if you know that the blob you're + /// reading is small. If not sure, use [`Self::read`] and check the size with + /// [`Reader::size`] before calling [`Reader::read_to_bytes`]. + pub async fn read_to_bytes(&self, hash: Hash) -> Result { + Reader::from_rpc_read(&self.rpc, hash) + .await? + .read_to_bytes() + .await + } + + /// Read all bytes of single blob at `offset` for length `len`. + /// + /// This allocates a buffer for the full length. + pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result { + Reader::from_rpc_read_at(&self.rpc, hash, offset, len) + .await? + .read_to_bytes() + .await + } + + /// Import a blob from a filesystem path. + /// + /// `path` should be an absolute path valid for the file system on which + /// the node runs. + /// If `in_place` is true, Iroh will assume that the data will not change and will share it in + /// place without copying to the Iroh data directory. + pub async fn add_from_path( + &self, + path: PathBuf, + in_place: bool, + tag: SetTagOption, + wrap: WrapOption, + ) -> Result { + let stream = self + .rpc + .server_streaming(AddPathRequest { + path, + in_place, + tag, + wrap, + }) + .await?; + Ok(AddProgress::new(stream)) + } + + /// Create a collection from already existing blobs. + /// + /// For automatically clearing the tags for the passed in blobs you can set + /// `tags_to_delete` to those tags, and they will be deleted once the collection is created. + pub async fn create_collection( + &self, + collection: Collection, + tag: SetTagOption, + tags_to_delete: Vec, + ) -> anyhow::Result<(Hash, Tag)> { + let CreateCollectionResponse { hash, tag } = self + .rpc + .rpc(CreateCollectionRequest { + collection, + tag, + tags_to_delete, + }) + .await??; + Ok((hash, tag)) + } + + /// Write a blob by passing an async reader. + pub async fn add_reader( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + tag: SetTagOption, + ) -> anyhow::Result { + const CAP: usize = 1024 * 64; // send 64KB per request by default + let input = ReaderStream::with_capacity(reader, CAP); + self.add_stream(input, tag).await + } + + /// Write a blob by passing a stream of bytes. + pub async fn add_stream( + &self, + input: impl Stream> + Send + Unpin + 'static, + tag: SetTagOption, + ) -> anyhow::Result { + let (mut sink, progress) = self.rpc.bidi(AddStreamRequest { tag }).await?; + let mut input = input.map(|chunk| match chunk { + Ok(chunk) => Ok(AddStreamUpdate::Chunk(chunk)), + Err(err) => { + warn!("Abort send, reason: failed to read from source stream: {err:?}"); + Ok(AddStreamUpdate::Abort) + } + }); + tokio::spawn(async move { + // TODO: Is it important to catch this error? It should also result in an error on the + // response stream. If we deem it important, we could one-shot send it into the + // BlobAddProgress and return from there. Not sure. + if let Err(err) = sink.send_all(&mut input).await { + warn!("Failed to send input stream to remote: {err:?}"); + } + }); + + Ok(AddProgress::new(progress)) + } + + /// Write a blob by passing bytes. + pub async fn add_bytes(&self, bytes: impl Into) -> anyhow::Result { + let input = futures_lite::stream::once(Ok(bytes.into())); + self.add_stream(input, SetTagOption::Auto).await?.await + } + + /// Write a blob by passing bytes, setting an explicit tag name. + pub async fn add_bytes_named( + &self, + bytes: impl Into, + name: impl Into, + ) -> anyhow::Result { + let input = futures_lite::stream::once(Ok(bytes.into())); + self.add_stream(input, SetTagOption::Named(name.into())) + .await? + .await + } + + /// Validate hashes on the running node. + /// + /// If `repair` is true, repair the store by removing invalid data. + pub async fn validate( + &self, + repair: bool, + ) -> Result>> { + let stream = self + .rpc + .server_streaming(ValidateRequest { repair }) + .await?; + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + } + + /// Validate hashes on the running node. + /// + /// If `repair` is true, repair the store by removing invalid data. + pub async fn consistency_check( + &self, + repair: bool, + ) -> Result>> { + let stream = self + .rpc + .server_streaming(ConsistencyCheckRequest { repair }) + .await?; + Ok(stream.map(|r| r.map_err(anyhow::Error::from))) + } + + /// Download a blob from another node and add it to the local database. + pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result { + self.download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::Raw, + nodes: vec![node], + tag: SetTagOption::Auto, + mode: DownloadMode::Queued, + }, + ) + .await + } + + /// Download a hash sequence from another node and add it to the local database. + pub async fn download_hash_seq(&self, hash: Hash, node: NodeAddr) -> Result { + self.download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::HashSeq, + nodes: vec![node], + tag: SetTagOption::Auto, + mode: DownloadMode::Queued, + }, + ) + .await + } + + /// Download a blob, with additional options. + pub async fn download_with_opts( + &self, + hash: Hash, + opts: DownloadOptions, + ) -> Result { + let DownloadOptions { + format, + nodes, + tag, + mode, + } = opts; + let stream = self + .rpc + .server_streaming(BlobDownloadRequest { + hash, + format, + nodes, + tag, + mode, + }) + .await?; + Ok(DownloadProgress::new( + stream.map(|res| res.map_err(anyhow::Error::from)), + )) + } + + /// Export a blob from the internal blob store to a path on the node's filesystem. + /// + /// `destination` should be an writeable, absolute path on the local node's filesystem. + /// + /// If `format` is set to [`ExportFormat::Collection`], and the `hash` refers to a collection, + /// all children of the collection will be exported. See [`ExportFormat`] for details. + /// + /// The `mode` argument defines if the blob should be copied to the target location or moved out of + /// the internal store into the target location. See [`ExportMode`] for details. + pub async fn export( + &self, + hash: Hash, + destination: PathBuf, + format: ExportFormat, + mode: ExportMode, + ) -> Result { + let req = ExportRequest { + hash, + path: destination, + format, + mode, + }; + let stream = self.rpc.server_streaming(req).await?; + Ok(ExportProgress::new( + stream.map(|r| r.map_err(anyhow::Error::from)), + )) + } + + /// List all complete blobs. + pub async fn list(&self) -> Result>> { + let stream = self.rpc.server_streaming(ListRequest).await?; + Ok(flatten(stream)) + } + + /// List all incomplete (partial) blobs. + pub async fn list_incomplete(&self) -> Result>> { + let stream = self.rpc.server_streaming(ListIncompleteRequest).await?; + Ok(flatten(stream)) + } + + /// Read the content of a collection. + pub async fn get_collection(&self, hash: Hash) -> Result { + Collection::load(hash, self).await + } + + /// List all collections. + pub fn list_collections(&self) -> Result>> { + let this = self.clone(); + Ok(Gen::new(|co| async move { + if let Err(cause) = this.list_collections_impl(&co).await { + co.yield_(Err(cause)).await; + } + })) + } + + async fn list_collections_impl(&self, co: &Co>) -> Result<()> { + let tags = self.tags_client(); + let mut tags = tags.list_hash_seq().await?; + while let Some(tag) = tags.next().await { + let tag = tag?; + if let Ok(collection) = self.get_collection(tag.hash).await { + let info = CollectionInfo { + tag: tag.name, + hash: tag.hash, + total_blobs_count: Some(collection.len() as u64 + 1), + total_blobs_size: Some(0), + }; + co.yield_(Ok(info)).await; + } + } + Ok(()) + } + + /// Delete a blob. + /// + /// **Warning**: this operation deletes the blob from the local store even + /// if it is tagged. You should usually not do this manually, but rely on the + /// node to remove data that is not tagged. + pub async fn delete_blob(&self, hash: Hash) -> Result<()> { + self.rpc.rpc(DeleteRequest { hash }).await??; + Ok(()) + } + + fn tags_client(&self) -> tags::Client { + tags::Client::new(self.rpc.clone()) + } +} + +impl SimpleStore for Client +where + C: Connector, +{ + async fn load(&self, hash: Hash) -> anyhow::Result { + self.read_to_bytes(hash).await + } +} + +/// Defines the way to read bytes. +#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)] +pub enum ReadAtLen { + /// Reads all available bytes. + #[default] + All, + /// Reads exactly this many bytes, erroring out on larger or smaller. + Exact(u64), + /// Reads at most this many bytes. + AtMost(u64), +} + +impl ReadAtLen { + /// todo make private again + pub fn as_result_len(&self, size_remaining: u64) -> u64 { + match self { + ReadAtLen::All => size_remaining, + ReadAtLen::Exact(len) => *len, + ReadAtLen::AtMost(len) => std::cmp::min(*len, size_remaining), + } + } +} + +/// Whether to wrap the added data in a collection. +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub enum WrapOption { + /// Do not wrap the file or directory. + #[default] + NoWrap, + /// Wrap the file or directory in a collection. + Wrap { + /// Override the filename in the wrapping collection. + name: Option, + }, +} + +/// Status information about a blob. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum BlobStatus { + /// The blob is not stored at all. + NotFound, + /// The blob is only stored partially. + Partial { + /// The size of the currently stored partial blob. + size: BaoBlobSize, + }, + /// The blob is stored completely. + Complete { + /// The size of the blob. + size: u64, + }, +} + +/// Outcome of a blob add operation. +#[derive(Debug, Clone)] +pub struct AddOutcome { + /// The hash of the blob + pub hash: Hash, + /// The format the blob + pub format: BlobFormat, + /// The size of the blob + pub size: u64, + /// The tag of the blob + pub tag: Tag, +} + +/// Information about a stored collection. +#[derive(Debug, Serialize, Deserialize)] +pub struct CollectionInfo { + /// Tag of the collection + pub tag: Tag, + + /// Hash of the collection + pub hash: Hash, + /// Number of children in the collection + /// + /// This is an optional field, because the data is not always available. + pub total_blobs_count: Option, + /// Total size of the raw data referred to by all links + /// + /// This is an optional field, because the data is not always available. + pub total_blobs_size: Option, +} + +/// Information about a complete blob. +#[derive(Debug, Serialize, Deserialize)] +pub struct BlobInfo { + /// Location of the blob + pub path: String, + /// The hash of the blob + pub hash: Hash, + /// The size of the blob + pub size: u64, +} + +/// Information about an incomplete blob. +#[derive(Debug, Serialize, Deserialize)] +pub struct IncompleteBlobInfo { + /// The size we got + pub size: u64, + /// The size we expect + pub expected_size: u64, + /// The hash of the blob + pub hash: Hash, +} + +/// Progress stream for blob add operations. +#[derive(derive_more::Debug)] +pub struct AddProgress { + #[debug(skip)] + stream: + Pin> + Send + Unpin + 'static>>, + current_total_size: Arc, +} + +impl AddProgress { + fn new( + stream: (impl Stream< + Item = Result, impl Into>, + > + Send + + Unpin + + 'static), + ) -> Self { + let current_total_size = Arc::new(AtomicU64::new(0)); + let total_size = current_total_size.clone(); + let stream = stream.map(move |item| match item { + Ok(item) => { + let item = item.into(); + if let crate::provider::AddProgress::Found { size, .. } = &item { + total_size.fetch_add(*size, Ordering::Relaxed); + } + Ok(item) + } + Err(err) => Err(err.into()), + }); + Self { + stream: Box::pin(stream), + current_total_size, + } + } + /// Finish writing the stream, ignoring all intermediate progress events. + /// + /// Returns a [`AddOutcome`] which contains a tag, format, hash and a size. + /// When importing a single blob, this is the hash and size of that blob. + /// When importing a collection, the hash is the hash of the collection and the size + /// is the total size of all imported blobs (but excluding the size of the collection blob + /// itself). + pub async fn finish(self) -> Result { + self.await + } +} + +impl Stream for AddProgress { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +impl Future for AddProgress { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) + } + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), + Poll::Ready(Some(Ok(msg))) => match msg { + crate::provider::AddProgress::AllDone { hash, format, tag } => { + let outcome = AddOutcome { + hash, + format, + tag, + size: self.current_total_size.load(Ordering::Relaxed), + }; + return Poll::Ready(Ok(outcome)); + } + crate::provider::AddProgress::Abort(err) => { + return Poll::Ready(Err(err.into())); + } + _ => {} + }, + } + } + } +} + +/// Outcome of a blob download operation. +#[derive(Debug, Clone)] +pub struct DownloadOutcome { + /// The size of the data we already had locally + pub local_size: u64, + /// The size of the data we downloaded from the network + pub downloaded_size: u64, + /// Statistics about the download + pub stats: crate::get::Stats, +} + +/// Progress stream for blob download operations. +#[derive(derive_more::Debug)] +pub struct DownloadProgress { + #[debug(skip)] + stream: Pin> + Send + Unpin + 'static>>, + current_local_size: Arc, + current_network_size: Arc, +} + +impl DownloadProgress { + /// Create a [`DownloadProgress`] that can help you easily poll the [`BytesDownloadProgress`] stream from your download until it is finished or errors. + pub fn new( + stream: (impl Stream, impl Into>> + + Send + + Unpin + + 'static), + ) -> Self { + let current_local_size = Arc::new(AtomicU64::new(0)); + let current_network_size = Arc::new(AtomicU64::new(0)); + + let local_size = current_local_size.clone(); + let network_size = current_network_size.clone(); + + let stream = stream.map(move |item| match item { + Ok(item) => { + let item = item.into(); + match &item { + BytesDownloadProgress::FoundLocal { size, .. } => { + local_size.fetch_add(size.value(), Ordering::Relaxed); + } + BytesDownloadProgress::Found { size, .. } => { + network_size.fetch_add(*size, Ordering::Relaxed); + } + _ => {} + } + + Ok(item) + } + Err(err) => Err(err.into()), + }); + Self { + stream: Box::pin(stream), + current_local_size, + current_network_size, + } + } + + /// Finish writing the stream, ignoring all intermediate progress events. + /// + /// Returns a [`DownloadOutcome`] which contains the size of the content we downloaded and the size of the content we already had locally. + /// When importing a single blob, this is the size of that blob. + /// When importing a collection, this is the total size of all imported blobs (but excluding the size of the collection blob itself). + pub async fn finish(self) -> Result { + self.await + } +} + +impl Stream for DownloadProgress { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +impl Future for DownloadProgress { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) + } + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), + Poll::Ready(Some(Ok(msg))) => match msg { + BytesDownloadProgress::AllDone(stats) => { + let outcome = DownloadOutcome { + local_size: self.current_local_size.load(Ordering::Relaxed), + downloaded_size: self.current_network_size.load(Ordering::Relaxed), + stats, + }; + return Poll::Ready(Ok(outcome)); + } + BytesDownloadProgress::Abort(err) => { + return Poll::Ready(Err(err.into())); + } + _ => {} + }, + } + } + } +} + +/// Outcome of a blob export operation. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ExportOutcome { + /// The total size of the exported data. + total_size: u64, +} + +/// Progress stream for blob export operations. +#[derive(derive_more::Debug)] +pub struct ExportProgress { + #[debug(skip)] + stream: Pin> + Send + Unpin + 'static>>, + current_total_size: Arc, +} + +impl ExportProgress { + /// Create a [`ExportProgress`] that can help you easily poll the [`BytesExportProgress`] stream from your + /// download until it is finished or errors. + pub fn new( + stream: (impl Stream, impl Into>> + + Send + + Unpin + + 'static), + ) -> Self { + let current_total_size = Arc::new(AtomicU64::new(0)); + let total_size = current_total_size.clone(); + let stream = stream.map(move |item| match item { + Ok(item) => { + let item = item.into(); + if let BytesExportProgress::Found { size, .. } = &item { + let size = size.value(); + total_size.fetch_add(size, Ordering::Relaxed); + } + + Ok(item) + } + Err(err) => Err(err.into()), + }); + Self { + stream: Box::pin(stream), + current_total_size, + } + } + + /// Finish writing the stream, ignoring all intermediate progress events. + /// + /// Returns a [`ExportOutcome`] which contains the size of the content we exported. + pub async fn finish(self) -> Result { + self.await + } +} + +impl Stream for ExportProgress { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +impl Future for ExportProgress { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) + } + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), + Poll::Ready(Some(Ok(msg))) => match msg { + BytesExportProgress::AllDone => { + let outcome = ExportOutcome { + total_size: self.current_total_size.load(Ordering::Relaxed), + }; + return Poll::Ready(Ok(outcome)); + } + BytesExportProgress::Abort(err) => { + return Poll::Ready(Err(err.into())); + } + _ => {} + }, + } + } + } +} + +/// Data reader for a single blob. +/// +/// Implements [`AsyncRead`]. +#[derive(derive_more::Debug)] +pub struct Reader { + size: u64, + response_size: u64, + is_complete: bool, + #[debug("StreamReader")] + stream: tokio_util::io::StreamReader>, Bytes>, +} + +impl Reader { + fn new( + size: u64, + response_size: u64, + is_complete: bool, + stream: BoxStreamSync<'static, io::Result>, + ) -> Self { + Self { + size, + response_size, + is_complete, + stream: StreamReader::new(stream), + } + } + + /// todo make private again + pub async fn from_rpc_read( + rpc: &RpcClient, + hash: Hash, + ) -> anyhow::Result + where + C: Connector, + { + Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await + } + + async fn from_rpc_read_at( + rpc: &RpcClient, + hash: Hash, + offset: u64, + len: ReadAtLen, + ) -> anyhow::Result + where + C: Connector, + { + let stream = rpc + .server_streaming(ReadAtRequest { hash, offset, len }) + .await?; + let mut stream = flatten(stream); + + let (size, is_complete) = match stream.next().await { + Some(Ok(ReadAtResponse::Entry { size, is_complete })) => (size, is_complete), + Some(Err(err)) => return Err(err), + Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")), + None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")), + }; + + let stream = stream.map(|item| match item { + Ok(ReadAtResponse::Data { chunk }) => Ok(chunk), + Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")), + Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))), + }); + let len = len.as_result_len(size.value() - offset); + Ok(Self::new(size.value(), len, is_complete, Box::pin(stream))) + } + + /// Total size of this blob. + pub fn size(&self) -> u64 { + self.size + } + + /// Whether this blob has been downloaded completely. + /// + /// Returns false for partial blobs for which some chunks are missing. + pub fn is_complete(&self) -> bool { + self.is_complete + } + + /// Read all bytes of the blob. + pub async fn read_to_bytes(&mut self) -> anyhow::Result { + let mut buf = Vec::with_capacity(self.response_size as usize); + self.read_to_end(&mut buf).await?; + Ok(buf.into()) + } +} + +impl AsyncRead for Reader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.stream).poll_read(cx, buf) + } +} + +impl Stream for Reader { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).get_pin_mut().poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.get_ref().size_hint() + } +} + +/// Options to configure a download request. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DownloadOptions { + /// The format of the data to download. + pub format: BlobFormat, + /// Source nodes to download from. + /// + /// If set to more than a single node, they will all be tried. If `mode` is set to + /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds. + /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel, + /// if the concurrency limits permit. + pub nodes: Vec, + /// Optional tag to tag the data with. + pub tag: SetTagOption, + /// Whether to directly start the download or add it to the download queue. + pub mode: DownloadMode, +} + +#[cfg(test)] +mod tests { + use iroh_net::NodeId; + use rand::RngCore; + use testresult::TestResult; + use tokio::{io::AsyncWriteExt, sync::mpsc}; + + use super::*; + use crate::hashseq::HashSeq; + + mod node { + //! An iroh node that just has the blobs transport + use std::{path::Path, sync::Arc}; + + use iroh_net::{NodeAddr, NodeId}; + use quic_rpc::transport::{Connector, Listener}; + use tokio_util::task::AbortOnDropHandle; + + use super::RpcService; + use crate::{ + provider::{CustomEventSender, EventSender}, + rpc::client::{blobs, tags}, + util::local_pool::LocalPool, + }; + + type RpcClient = quic_rpc::RpcClient; + + /// An iroh node that just has the blobs transport + #[derive(Debug)] + pub struct Node { + router: iroh_router::Router, + client: RpcClient, + _local_pool: LocalPool, + _rpc_task: AbortOnDropHandle<()>, + } + + /// An iroh node builder + #[derive(Debug)] + pub struct Builder { + store: S, + events: EventSender, + } + + impl Builder { + /// Sets the event sender + pub fn blobs_events(self, events: impl CustomEventSender) -> Self { + Builder { + store: self.store, + events: events.into(), + } + } + + /// Spawns the node + pub async fn spawn(self) -> anyhow::Result { + let (client, router, rpc_task, _local_pool) = + setup_router(self.store, self.events).await?; + Ok(Node { + router, + client, + _rpc_task: AbortOnDropHandle::new(rpc_task), + _local_pool, + }) + } + } + + impl Node { + /// Creates a new node with memory storage + pub fn memory() -> Builder { + Builder { + store: crate::store::mem::Store::new(), + events: Default::default(), + } + } + + /// Creates a new node with persistent storage + pub async fn persistent( + path: impl AsRef, + ) -> anyhow::Result> { + Ok(Builder { + store: crate::store::fs::Store::load(path).await?, + events: Default::default(), + }) + } + + /// Returns the node id + pub fn node_id(&self) -> NodeId { + self.router.endpoint().node_id() + } + + /// Returns the node address + pub async fn node_addr(&self) -> anyhow::Result { + self.router.endpoint().node_addr().await + } + + /// Shuts down the node + pub async fn shutdown(self) -> anyhow::Result<()> { + self.router.shutdown().await + } + + /// Returns an in-memory blobs client + pub fn blobs(&self) -> blobs::Client { + blobs::Client::new(self.client.clone()) + } + + /// Returns an in-memory tags client + pub fn tags(&self) -> tags::Client { + tags::Client::new(self.client.clone()) + } + } + + async fn setup_router( + store: S, + events: EventSender, + ) -> anyhow::Result<( + RpcClient, + iroh_router::Router, + tokio::task::JoinHandle<()>, + LocalPool, + )> { + let endpoint = iroh_net::Endpoint::builder().discovery_n0().bind().await?; + let local_pool = LocalPool::single(); + let mut router = iroh_router::Router::builder(endpoint.clone()); + + // Setup blobs + let downloader = crate::downloader::Downloader::new( + store.clone(), + endpoint.clone(), + local_pool.handle().clone(), + ); + let blobs = Arc::new(crate::net_protocol::Blobs::new_with_events( + store.clone(), + local_pool.handle().clone(), + events, + downloader, + endpoint.clone(), + )); + router = router.accept(crate::protocol::ALPN.to_vec(), blobs.clone()); + + // Build the router + let router = router.spawn().await?; + + // Setup RPC + let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32); + let controller = controller.boxed(); + let internal_rpc = internal_rpc.boxed(); + let internal_rpc = quic_rpc::RpcServer::new(internal_rpc); + + let rpc_server_task: tokio::task::JoinHandle<()> = tokio::task::spawn(async move { + loop { + let request = internal_rpc.accept().await; + match request { + Ok(accepting) => { + let blobs = blobs.clone(); + tokio::task::spawn(async move { + let (msg, chan) = accepting.read_first().await.unwrap(); + blobs.handle_rpc_request(msg, chan).await.unwrap(); + }); + } + Err(err) => { + tracing::warn!("rpc error: {:?}", err); + } + } + } + }); + + let client = quic_rpc::RpcClient::new(controller); + + Ok((client, router, rpc_server_task, local_pool)) + } + } + + #[tokio::test] + async fn test_blob_create_collection() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let node = node::Node::memory().spawn().await?; + + // create temp file + let temp_dir = tempfile::tempdir().context("tempdir")?; + + let in_root = temp_dir.path().join("in"); + tokio::fs::create_dir_all(in_root.clone()) + .await + .context("create dir all")?; + + let mut paths = Vec::new(); + for i in 0..5 { + let path = in_root.join(format!("test-{i}")); + let size = 100; + let mut buf = vec![0u8; size]; + rand::thread_rng().fill_bytes(&mut buf); + let mut file = tokio::fs::File::create(path.clone()) + .await + .context("create file")?; + file.write_all(&buf.clone()).await.context("write_all")?; + file.flush().await.context("flush")?; + paths.push(path); + } + + let blobs = node.blobs(); + + let mut collection = Collection::default(); + let mut tags = Vec::new(); + // import files + for path in &paths { + let import_outcome = blobs + .add_from_path( + path.to_path_buf(), + false, + SetTagOption::Auto, + WrapOption::NoWrap, + ) + .await + .context("import file")? + .finish() + .await + .context("import finish")?; + + collection.push( + path.file_name().unwrap().to_str().unwrap().to_string(), + import_outcome.hash, + ); + tags.push(import_outcome.tag); + } + + let (hash, tag) = blobs + .create_collection(collection, SetTagOption::Auto, tags) + .await?; + + let collections: Vec<_> = blobs.list_collections()?.try_collect().await?; + + assert_eq!(collections.len(), 1); + { + let CollectionInfo { + tag, + hash, + total_blobs_count, + .. + } = &collections[0]; + assert_eq!(tag, tag); + assert_eq!(hash, hash); + // 5 blobs + 1 meta + assert_eq!(total_blobs_count, &Some(5 + 1)); + } + + // check that "temp" tags have been deleted + let tags: Vec<_> = node.tags().list().await?.try_collect().await?; + assert_eq!(tags.len(), 1); + assert_eq!(tags[0].hash, hash); + assert_eq!(tags[0].name, tag); + assert_eq!(tags[0].format, BlobFormat::HashSeq); + + Ok(()) + } + + #[tokio::test] + async fn test_blob_read_at() -> Result<()> { + // let _guard = iroh_test::logging::setup(); + + let node = node::Node::memory().spawn().await?; + + // create temp file + let temp_dir = tempfile::tempdir().context("tempdir")?; + + let in_root = temp_dir.path().join("in"); + tokio::fs::create_dir_all(in_root.clone()) + .await + .context("create dir all")?; + + let path = in_root.join("test-blob"); + let size = 1024 * 128; + let buf: Vec = (0..size).map(|i| i as u8).collect(); + let mut file = tokio::fs::File::create(path.clone()) + .await + .context("create file")?; + file.write_all(&buf.clone()).await.context("write_all")?; + file.flush().await.context("flush")?; + + let blobs = node.blobs(); + + let import_outcome = blobs + .add_from_path( + path.to_path_buf(), + false, + SetTagOption::Auto, + WrapOption::NoWrap, + ) + .await + .context("import file")? + .finish() + .await + .context("import finish")?; + + let hash = import_outcome.hash; + + // Read everything + let res = blobs.read_to_bytes(hash).await?; + assert_eq!(&res, &buf[..]); + + // Read at smaller than blob_get_chunk_size + let res = blobs + .read_at_to_bytes(hash, 0, ReadAtLen::Exact(100)) + .await?; + assert_eq!(res.len(), 100); + assert_eq!(&res[..], &buf[0..100]); + + let res = blobs + .read_at_to_bytes(hash, 20, ReadAtLen::Exact(120)) + .await?; + assert_eq!(res.len(), 120); + assert_eq!(&res[..], &buf[20..140]); + + // Read at equal to blob_get_chunk_size + let res = blobs + .read_at_to_bytes(hash, 0, ReadAtLen::Exact(1024 * 64)) + .await?; + assert_eq!(res.len(), 1024 * 64); + assert_eq!(&res[..], &buf[0..1024 * 64]); + + let res = blobs + .read_at_to_bytes(hash, 20, ReadAtLen::Exact(1024 * 64)) + .await?; + assert_eq!(res.len(), 1024 * 64); + assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]); + + // Read at larger than blob_get_chunk_size + let res = blobs + .read_at_to_bytes(hash, 0, ReadAtLen::Exact(10 + 1024 * 64)) + .await?; + assert_eq!(res.len(), 10 + 1024 * 64); + assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]); + + let res = blobs + .read_at_to_bytes(hash, 20, ReadAtLen::Exact(10 + 1024 * 64)) + .await?; + assert_eq!(res.len(), 10 + 1024 * 64); + assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]); + + // full length + let res = blobs.read_at_to_bytes(hash, 20, ReadAtLen::All).await?; + assert_eq!(res.len(), 1024 * 128 - 20); + assert_eq!(&res[..], &buf[20..]); + + // size should be total + let reader = blobs.read_at(hash, 0, ReadAtLen::Exact(20)).await?; + assert_eq!(reader.size(), 1024 * 128); + assert_eq!(reader.response_size, 20); + + // last chunk - exact + let res = blobs + .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::Exact(1024)) + .await?; + assert_eq!(res.len(), 1024); + assert_eq!(res, &buf[1024 * 127..]); + + // last chunk - open + let res = blobs + .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::All) + .await?; + assert_eq!(res.len(), 1024); + assert_eq!(res, &buf[1024 * 127..]); + + // last chunk - larger + let mut res = blobs + .read_at(hash, 1024 * 127, ReadAtLen::AtMost(2048)) + .await?; + assert_eq!(res.size, 1024 * 128); + assert_eq!(res.response_size, 1024); + let res = res.read_to_bytes().await?; + assert_eq!(res.len(), 1024); + assert_eq!(res, &buf[1024 * 127..]); + + // out of bounds - too long + let res = blobs + .read_at(hash, 0, ReadAtLen::Exact(1024 * 128 + 1)) + .await; + let err = res.unwrap_err(); + assert!(err.to_string().contains("out of bound")); + + // out of bounds - offset larger than blob + let res = blobs.read_at(hash, 1024 * 128 + 1, ReadAtLen::All).await; + let err = res.unwrap_err(); + assert!(err.to_string().contains("out of range")); + + // out of bounds - offset + length too large + let res = blobs + .read_at(hash, 1024 * 127, ReadAtLen::Exact(1025)) + .await; + let err = res.unwrap_err(); + assert!(err.to_string().contains("out of bound")); + + Ok(()) + } + + #[tokio::test] + async fn test_blob_get_collection() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let node = node::Node::memory().spawn().await?; + + // create temp file + let temp_dir = tempfile::tempdir().context("tempdir")?; + + let in_root = temp_dir.path().join("in"); + tokio::fs::create_dir_all(in_root.clone()) + .await + .context("create dir all")?; + + let mut paths = Vec::new(); + for i in 0..5 { + let path = in_root.join(format!("test-{i}")); + let size = 100; + let mut buf = vec![0u8; size]; + rand::thread_rng().fill_bytes(&mut buf); + let mut file = tokio::fs::File::create(path.clone()) + .await + .context("create file")?; + file.write_all(&buf.clone()).await.context("write_all")?; + file.flush().await.context("flush")?; + paths.push(path); + } + + let blobs = node.blobs(); + + let mut collection = Collection::default(); + let mut tags = Vec::new(); + // import files + for path in &paths { + let import_outcome = blobs + .add_from_path( + path.to_path_buf(), + false, + SetTagOption::Auto, + WrapOption::NoWrap, + ) + .await + .context("import file")? + .finish() + .await + .context("import finish")?; + + collection.push( + path.file_name().unwrap().to_str().unwrap().to_string(), + import_outcome.hash, + ); + tags.push(import_outcome.tag); + } + + let (hash, _tag) = blobs + .create_collection(collection, SetTagOption::Auto, tags) + .await?; + + let collection = blobs.get_collection(hash).await?; + + // 5 blobs + assert_eq!(collection.len(), 5); + + Ok(()) + } + + #[tokio::test] + async fn test_blob_share() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let node = node::Node::memory().spawn().await?; + + // create temp file + let temp_dir = tempfile::tempdir().context("tempdir")?; + + let in_root = temp_dir.path().join("in"); + tokio::fs::create_dir_all(in_root.clone()) + .await + .context("create dir all")?; + + let path = in_root.join("test-blob"); + let size = 1024 * 128; + let buf: Vec = (0..size).map(|i| i as u8).collect(); + let mut file = tokio::fs::File::create(path.clone()) + .await + .context("create file")?; + file.write_all(&buf.clone()).await.context("write_all")?; + file.flush().await.context("flush")?; + + let blobs = node.blobs(); + + let import_outcome = blobs + .add_from_path( + path.to_path_buf(), + false, + SetTagOption::Auto, + WrapOption::NoWrap, + ) + .await + .context("import file")? + .finish() + .await + .context("import finish")?; + + // let ticket = blobs + // .share(import_outcome.hash, BlobFormat::Raw, Default::default()) + // .await?; + // assert_eq!(ticket.hash(), import_outcome.hash); + + let status = blobs.status(import_outcome.hash).await?; + assert_eq!(status, BlobStatus::Complete { size }); + + Ok(()) + } + + #[derive(Debug, Clone)] + struct BlobEvents { + sender: mpsc::Sender, + } + + impl BlobEvents { + fn new(cap: usize) -> (Self, mpsc::Receiver) { + let (s, r) = mpsc::channel(cap); + (Self { sender: s }, r) + } + } + + impl crate::provider::CustomEventSender for BlobEvents { + fn send(&self, event: crate::provider::Event) -> futures_lite::future::Boxed<()> { + let sender = self.sender.clone(); + Box::pin(async move { + sender.send(event).await.ok(); + }) + } + + fn try_send(&self, event: crate::provider::Event) { + self.sender.try_send(event).ok(); + } + } + + #[tokio::test] + async fn test_blob_provide_events() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let (node1_events, mut node1_events_r) = BlobEvents::new(16); + let node1 = node::Node::memory() + .blobs_events(node1_events) + .spawn() + .await?; + + let (node2_events, mut node2_events_r) = BlobEvents::new(16); + let node2 = node::Node::memory() + .blobs_events(node2_events) + .spawn() + .await?; + + let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?; + + // Download in node2 + let node1_addr = node1.node_addr().await?; + let res = node2 + .blobs() + .download(import_outcome.hash, node1_addr) + .await? + .await?; + dbg!(&res); + assert_eq!(res.local_size, 0); + assert_eq!(res.downloaded_size, 11); + + node1.shutdown().await?; + node2.shutdown().await?; + + let mut ev1 = Vec::new(); + while let Some(ev) = node1_events_r.recv().await { + ev1.push(ev); + } + // assert_eq!(ev1.len(), 3); + assert!(matches!( + ev1[0], + crate::provider::Event::ClientConnected { .. } + )); + assert!(matches!( + ev1[1], + crate::provider::Event::GetRequestReceived { .. } + )); + assert!(matches!( + ev1[2], + crate::provider::Event::TransferProgress { .. } + )); + assert!(matches!( + ev1[3], + crate::provider::Event::TransferCompleted { .. } + )); + dbg!(&ev1); + + let mut ev2 = Vec::new(); + while let Some(ev) = node2_events_r.recv().await { + ev2.push(ev); + } + + // Node 2 did not provide anything + assert!(ev2.is_empty()); + Ok(()) + } + /// Download a existing blob from oneself + #[tokio::test] + async fn test_blob_get_self_existing() -> TestResult<()> { + let _guard = iroh_test::logging::setup(); + + let node = node::Node::memory().spawn().await?; + let node_id = node.node_id(); + let blobs = node.blobs(); + + let AddOutcome { hash, size, .. } = blobs.add_bytes("foo").await?; + + // Direct + let res = blobs + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::Raw, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Direct, + }, + ) + .await? + .await?; + + assert_eq!(res.local_size, size); + assert_eq!(res.downloaded_size, 0); + + // Queued + let res = blobs + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::Raw, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Queued, + }, + ) + .await? + .await?; + + assert_eq!(res.local_size, size); + assert_eq!(res.downloaded_size, 0); + + Ok(()) + } + + /// Download a missing blob from oneself + #[tokio::test] + async fn test_blob_get_self_missing() -> TestResult<()> { + let _guard = iroh_test::logging::setup(); + + let node = node::Node::memory().spawn().await?; + let node_id = node.node_id(); + let blobs = node.blobs(); + + let hash = Hash::from_bytes([0u8; 32]); + + // Direct + let res = blobs + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::Raw, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Direct, + }, + ) + .await? + .await; + assert!(res.is_err()); + assert_eq!( + res.err().unwrap().to_string().as_str(), + "No nodes to download from provided" + ); + + // Queued + let res = blobs + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::Raw, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Queued, + }, + ) + .await? + .await; + assert!(res.is_err()); + assert_eq!( + res.err().unwrap().to_string().as_str(), + "No provider nodes found" + ); + + Ok(()) + } + + /// Download a existing collection. Check that things succeed and no download is performed. + #[tokio::test] + async fn test_blob_get_existing_collection() -> TestResult<()> { + let _guard = iroh_test::logging::setup(); + + let node = node::Node::memory().spawn().await?; + // We use a nonexisting node id because we just want to check that this succeeds without + // hitting the network. + let node_id = NodeId::from_bytes(&[0u8; 32])?; + let blobs = node.blobs(); + + let mut collection = Collection::default(); + let mut tags = Vec::new(); + let mut size = 0; + for value in ["iroh", "is", "cool"] { + let import_outcome = blobs.add_bytes(value).await.context("add bytes")?; + collection.push(value.to_string(), import_outcome.hash); + tags.push(import_outcome.tag); + size += import_outcome.size; + } + + let (hash, _tag) = blobs + .create_collection(collection, SetTagOption::Auto, tags) + .await?; + + // load the hashseq and collection header manually to calculate our expected size + let hashseq_bytes = blobs.read_to_bytes(hash).await?; + size += hashseq_bytes.len() as u64; + let hashseq = HashSeq::try_from(hashseq_bytes)?; + let collection_header_bytes = blobs + .read_to_bytes(hashseq.into_iter().next().expect("header to exist")) + .await?; + size += collection_header_bytes.len() as u64; + + // Direct + let res = blobs + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::HashSeq, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Direct, + }, + ) + .await? + .await + .context("direct (download)")?; + + assert_eq!(res.local_size, size); + assert_eq!(res.downloaded_size, 0); + + // Queued + let res = blobs + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::HashSeq, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Queued, + }, + ) + .await? + .await + .context("queued")?; + + assert_eq!(res.local_size, size); + assert_eq!(res.downloaded_size, 0); + + Ok(()) + } + + #[tokio::test] + #[cfg_attr(target_os = "windows", ignore = "flaky")] + async fn test_blob_delete_mem() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let node = node::Node::memory().spawn().await?; + + let res = node.blobs().add_bytes(&b"hello world"[..]).await?; + + let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?; + assert_eq!(hashes.len(), 1); + assert_eq!(hashes[0].hash, res.hash); + + // delete + node.blobs().delete_blob(res.hash).await?; + + let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?; + assert!(hashes.is_empty()); + + Ok(()) + } + + #[tokio::test] + async fn test_blob_delete_fs() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let dir = tempfile::tempdir()?; + let node = node::Node::persistent(dir.path()).await?.spawn().await?; + + let res = node.blobs().add_bytes(&b"hello world"[..]).await?; + + let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?; + assert_eq!(hashes.len(), 1); + assert_eq!(hashes[0].hash, res.hash); + + // delete + node.blobs().delete_blob(res.hash).await?; + + let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?; + assert!(hashes.is_empty()); + + Ok(()) + } +} diff --git a/src/rpc/client/blobs/batch.rs b/src/rpc/client/blobs/batch.rs new file mode 100644 index 000000000..b82f17837 --- /dev/null +++ b/src/rpc/client/blobs/batch.rs @@ -0,0 +1,472 @@ +use std::{ + io, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +use anyhow::{anyhow, Context, Result}; +use bytes::Bytes; +use futures_buffered::BufferedStreamExt; +use futures_lite::StreamExt; +use futures_util::{sink::Buffer, FutureExt, SinkExt, Stream}; +use quic_rpc::{client::UpdateSink, Connector, RpcClient}; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; +use tracing::{debug, warn}; + +use super::WrapOption; +use crate::{ + format::collection::Collection, + net_protocol::BatchId, + provider::BatchAddPathProgress, + rpc::proto::{ + blobs::{ + BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, + BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchUpdate, + }, + tags::{self, SyncMode}, + RpcService, + }, + store::ImportMode, + util::{SetTagOption, TagDrop}, + BlobFormat, HashAndFormat, Tag, TempTag, +}; + +/// A scope in which blobs can be added. +#[derive(derive_more::Debug)] +struct BatchInner +where + C: Connector, +{ + /// The id of the scope. + batch: BatchId, + /// The rpc client. + rpc: RpcClient, + /// The stream to send drop + #[debug(skip)] + updates: Mutex, BatchUpdate>>, +} + +/// A batch for write operations. +/// +/// This serves mostly as a scope for temporary tags. +/// +/// It is not a transaction, so things in a batch are not atomic. Also, there is +/// no isolation between batches. +#[derive(derive_more::Debug)] +pub struct Batch(Arc>) +where + C: Connector; + +impl TagDrop for BatchInner +where + C: Connector, +{ + fn on_drop(&self, content: &HashAndFormat) { + let mut updates = self.updates.lock().unwrap(); + // make a spirited attempt to notify the server that we are dropping the content + // + // this will occasionally fail, but that's acceptable. The temp tags for the batch + // will be cleaned up as soon as the entire batch is dropped. + // + // E.g. a typical scenario is that you create a large array of temp tags, and then + // store them in a hash sequence and then drop the array. You will get many drops + // at the same time, and might get a send failure here. + // + // But that just means that the server will clean up the temp tags when the batch is + // dropped. + updates.feed(BatchUpdate::Drop(*content)).now_or_never(); + updates.flush().now_or_never(); + } +} + +/// Options for adding a file as a blob +#[derive(Debug, Clone, Copy, Default)] +pub struct AddFileOpts { + /// The import mode + pub import_mode: ImportMode, + /// The format of the blob + pub format: BlobFormat, +} + +/// Options for adding a directory as a collection +#[derive(Debug, Clone)] +pub struct AddDirOpts { + /// The import mode + pub import_mode: ImportMode, + /// Whether to preserve the directory name + pub wrap: WrapOption, + /// Io parallelism + pub io_parallelism: usize, +} + +impl Default for AddDirOpts { + fn default() -> Self { + Self { + import_mode: ImportMode::TryReference, + wrap: WrapOption::NoWrap, + io_parallelism: 4, + } + } +} + +/// Options for adding a directory as a collection +#[derive(Debug, Clone)] +pub struct AddReaderOpts { + /// The format of the blob + pub format: BlobFormat, + /// Size of the chunks to send + pub chunk_size: usize, +} + +impl Default for AddReaderOpts { + fn default() -> Self { + Self { + format: BlobFormat::Raw, + chunk_size: 1024 * 64, + } + } +} + +impl Batch +where + C: Connector, +{ + pub(super) fn new( + batch: BatchId, + rpc: RpcClient, + updates: UpdateSink, + buffer_size: usize, + ) -> Self { + let updates = updates.buffer(buffer_size); + Self(Arc::new(BatchInner { + batch, + rpc, + updates: updates.into(), + })) + } + + /// Write a blob by passing bytes. + pub async fn add_bytes(&self, bytes: impl Into) -> Result { + self.add_bytes_with_opts(bytes, Default::default()).await + } + + /// Import a blob from a filesystem path, using the default options. + /// + /// For more control, use [`Self::add_file_with_opts`]. + pub async fn add_file(&self, path: PathBuf) -> Result<(TempTag, u64)> { + self.add_file_with_opts(path, AddFileOpts::default()).await + } + + /// Add a directory as a hashseq in iroh collection format + pub async fn add_dir(&self, root: PathBuf) -> Result { + self.add_dir_with_opts(root, Default::default()).await + } + + /// Write a blob by passing an async reader. + /// + /// This will consume the stream in 64KB chunks, and use a format of [BlobFormat::Raw]. + /// + /// For more options, see [`Self::add_reader_with_opts`]. + pub async fn add_reader( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + ) -> anyhow::Result { + self.add_reader_with_opts(reader, Default::default()).await + } + + /// Write a blob by passing a stream of bytes. + pub async fn add_stream( + &self, + input: impl Stream> + Send + Unpin + 'static, + ) -> Result { + self.add_stream_with_opts(input, Default::default()).await + } + + /// Creates a temp tag to protect some content (blob or hashseq) from being deleted. + /// + /// This is a lower-level API. The other functions in [`Batch`] already create [`TempTag`]s automatically. + /// + /// [`TempTag`]s allow you to protect some data from deletion while a download is ongoing, + /// even if you don't want to protect it permanently. + pub async fn temp_tag(&self, content: HashAndFormat) -> Result { + // Notify the server that we want one temp tag for the given content + self.0 + .rpc + .rpc(BatchCreateTempTagRequest { + batch: self.0.batch, + content, + }) + .await??; + // Only after success of the above call, we can create the corresponding local temp tag + Ok(self.local_temp_tag(content, None)) + } + + /// Write a blob by passing an async reader. + /// + /// This consumes the stream in chunks using `opts.chunk_size`. A good default is 64KB. + pub async fn add_reader_with_opts( + &self, + reader: impl AsyncRead + Unpin + Send + 'static, + opts: AddReaderOpts, + ) -> anyhow::Result { + let AddReaderOpts { format, chunk_size } = opts; + let input = ReaderStream::with_capacity(reader, chunk_size); + self.add_stream_with_opts(input, format).await + } + + /// Write a blob by passing bytes. + pub async fn add_bytes_with_opts( + &self, + bytes: impl Into, + format: BlobFormat, + ) -> Result { + let input = futures_lite::stream::once(Ok(bytes.into())); + self.add_stream_with_opts(input, format).await + } + + /// Import a blob from a filesystem path. + /// + /// `path` should be an absolute path valid for the file system on which + /// the node runs, which refers to a file. + /// + /// If you use [`ImportMode::TryReference`], Iroh will assume that the data will not + /// change and will share it in place without copying to the Iroh data directory + /// if appropriate. However, for tiny files, Iroh will copy the data. + /// + /// If you use [`ImportMode::Copy`], Iroh will always copy the data. + /// + /// Will return a temp tag for the added blob, as well as the size of the file. + pub async fn add_file_with_opts( + &self, + path: PathBuf, + opts: AddFileOpts, + ) -> Result<(TempTag, u64)> { + let AddFileOpts { + import_mode, + format, + } = opts; + anyhow::ensure!( + path.is_absolute(), + "Path must be absolute, but got: {:?}", + path + ); + anyhow::ensure!(path.is_file(), "Path does not refer to a file: {:?}", path); + let mut stream = self + .0 + .rpc + .server_streaming(BatchAddPathRequest { + path, + import_mode, + format, + batch: self.0.batch, + }) + .await?; + let mut res_hash = None; + let mut res_size = None; + while let Some(item) = stream.next().await { + match item?.0 { + BatchAddPathProgress::Abort(cause) => { + Err(cause)?; + } + BatchAddPathProgress::Done { hash } => { + res_hash = Some(hash); + } + BatchAddPathProgress::Found { size } => { + res_size = Some(size); + } + _ => {} + } + } + let hash = res_hash.context("Missing hash")?; + let size = res_size.context("Missing size")?; + Ok(( + self.local_temp_tag(HashAndFormat { hash, format }, Some(size)), + size, + )) + } + + /// Add a directory as a hashseq in iroh collection format + /// + /// This can also be used to add a single file as a collection, if + /// wrap is set to [WrapOption::Wrap]. + /// + /// However, if you want to add a single file as a raw blob, use add_file instead. + pub async fn add_dir_with_opts(&self, root: PathBuf, opts: AddDirOpts) -> Result { + let AddDirOpts { + import_mode, + wrap, + io_parallelism, + } = opts; + anyhow::ensure!(root.is_absolute(), "Path must be absolute"); + + // let (send, recv) = flume::bounded(32); + // let import_progress = FlumeProgressSender::new(send); + + // import all files below root recursively + let data_sources = crate::util::fs::scan_path(root, wrap)?; + let opts = AddFileOpts { + import_mode, + format: BlobFormat::Raw, + }; + let result: Vec<_> = futures_lite::stream::iter(data_sources) + .map(|source| { + // let import_progress = import_progress.clone(); + async move { + let name = source.name().to_string(); + let (tag, size) = self + .add_file_with_opts(source.path().to_owned(), opts) + .await?; + let hash = *tag.hash(); + anyhow::Ok((name, hash, size, tag)) + } + }) + .buffered_ordered(io_parallelism) + .try_collect() + .await?; + + // create a collection + let (collection, child_tags): (Collection, Vec<_>) = result + .into_iter() + .map(|(name, hash, _, tag)| ((name, hash), tag)) + .unzip(); + + let tag = self.add_collection(collection).await?; + drop(child_tags); + Ok(tag) + } + + /// Write a blob by passing a stream of bytes. + /// + /// For convenient interop with common sources of data, this function takes a stream of `io::Result`. + /// If you have raw bytes, you need to wrap them in `io::Result::Ok`. + pub async fn add_stream_with_opts( + &self, + mut input: impl Stream> + Send + Unpin + 'static, + format: BlobFormat, + ) -> Result { + let (mut sink, mut stream) = self + .0 + .rpc + .bidi(BatchAddStreamRequest { + batch: self.0.batch, + format, + }) + .await?; + let mut size = 0u64; + while let Some(item) = input.next().await { + match item { + Ok(chunk) => { + size += chunk.len() as u64; + sink.send(BatchAddStreamUpdate::Chunk(chunk)) + .await + .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; + } + Err(err) => { + warn!("Abort send, reason: failed to read from source stream: {err:?}"); + sink.send(BatchAddStreamUpdate::Abort) + .await + .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; + break; + } + } + } + // this is needed for the remote to notice that the stream is closed + drop(sink); + let mut res = None; + while let Some(item) = stream.next().await { + match item? { + BatchAddStreamResponse::Abort(cause) => { + Err(cause)?; + } + BatchAddStreamResponse::Result { hash } => { + res = Some(hash); + } + _ => {} + } + } + let hash = res.context("Missing answer")?; + Ok(self.local_temp_tag(HashAndFormat { hash, format }, Some(size))) + } + + /// Add a collection. + /// + /// This is a convenience function that converts the collection into two blobs + /// (the metadata and the hash sequence) and adds them, returning a temp tag for + /// the hash sequence. + /// + /// Note that this does not guarantee that the data that the collection refers to + /// actually exists. It will just create 2 blobs, the metadata and the hash sequence + /// itself. + pub async fn add_collection(&self, collection: Collection) -> Result { + self.add_blob_seq(collection.to_blobs()).await + } + + /// Add a sequence of blobs, where the last is a hash sequence. + /// + /// It is a common pattern in iroh to have a hash sequence with one or more + /// blobs of metadata, and the remaining blobs being the actual data. E.g. + /// a collection is a hash sequence where the first child is the metadata. + pub async fn add_blob_seq(&self, iter: impl Iterator) -> Result { + let mut blobs = iter.peekable(); + // put the tags somewhere + let mut tags = vec![]; + loop { + let blob = blobs.next().context("Failed to get next blob")?; + if blobs.peek().is_none() { + return self.add_bytes_with_opts(blob, BlobFormat::HashSeq).await; + } else { + tags.push(self.add_bytes(blob).await?); + } + } + } + + /// Upgrades a temp tag to a persistent tag. + pub async fn persist(&self, tt: TempTag) -> Result { + let tag = self + .0 + .rpc + .rpc(tags::CreateRequest { + value: tt.hash_and_format(), + batch: Some(self.0.batch), + sync: SyncMode::Full, + }) + .await??; + Ok(tag) + } + + /// Upgrades a temp tag to a persistent tag with a specific name. + pub async fn persist_to(&self, tt: TempTag, tag: Tag) -> Result<()> { + self.0 + .rpc + .rpc(tags::SetRequest { + name: tag, + value: Some(tt.hash_and_format()), + batch: Some(self.0.batch), + sync: SyncMode::Full, + }) + .await??; + Ok(()) + } + + /// Upgrades a temp tag to a persistent tag with either a specific name or + /// an automatically generated name. + pub async fn persist_with_opts(&self, tt: TempTag, opts: SetTagOption) -> Result { + match opts { + SetTagOption::Auto => self.persist(tt).await, + SetTagOption::Named(tag) => { + self.persist_to(tt, tag.clone()).await?; + Ok(tag) + } + } + } + + /// Creates a temp tag for the given hash and format, without notifying the server. + /// + /// Caution: only do this for data for which you know the server side has created a temp tag. + fn local_temp_tag(&self, inner: HashAndFormat, _size: Option) -> TempTag { + let on_drop: Arc = self.0.clone(); + let on_drop = Some(Arc::downgrade(&on_drop)); + TempTag::new(inner, on_drop) + } +} diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs new file mode 100644 index 000000000..2b7cbc04d --- /dev/null +++ b/src/rpc/client/tags.rs @@ -0,0 +1,70 @@ +//! API for tag management. +//! +//! The purpose of tags is to mark information as important to prevent it +//! from being garbage-collected (if the garbage collector is turned on). +//! Currently this is used for blobs. +//! +//! The main entry point is the [`Client`]. +//! +//! [`Client::list`] can be used to list all tags. +//! [`Client::list_hash_seq`] can be used to list all tags with a hash_seq format. +//! +//! [`Client::delete`] can be used to delete a tag. +use anyhow::Result; +use futures_lite::{Stream, StreamExt}; +use quic_rpc::{client::BoxedConnector, Connector, RpcClient}; +use serde::{Deserialize, Serialize}; + +use crate::{ + rpc::proto::{ + tags::{DeleteRequest, ListRequest}, + RpcService, + }, + BlobFormat, Hash, Tag, +}; + +/// Iroh tags client. +#[derive(Debug, Clone)] +#[repr(transparent)] +pub struct Client> { + pub(super) rpc: RpcClient, +} + +impl Client +where + C: Connector, +{ + /// Creates a new client + pub fn new(rpc: RpcClient) -> Self { + Self { rpc } + } + + /// Lists all tags. + pub async fn list(&self) -> Result>> { + let stream = self.rpc.server_streaming(ListRequest::all()).await?; + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + } + + /// Lists all tags with a hash_seq format. + pub async fn list_hash_seq(&self) -> Result>> { + let stream = self.rpc.server_streaming(ListRequest::hash_seq()).await?; + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + } + + /// Deletes a tag. + pub async fn delete(&self, name: Tag) -> Result<()> { + self.rpc.rpc(DeleteRequest { name }).await??; + Ok(()) + } +} + +/// Information about a tag. +#[derive(Debug, Serialize, Deserialize)] +pub struct TagInfo { + /// Name of the tag + pub name: Tag, + /// Format of the data + pub format: BlobFormat, + /// Hash of the data + pub hash: Hash, +} diff --git a/src/rpc/proto.rs b/src/rpc/proto.rs new file mode 100644 index 000000000..174b0a80c --- /dev/null +++ b/src/rpc/proto.rs @@ -0,0 +1,36 @@ +//! RPC protocol for the iroh-blobs service +use nested_enum_utils::enum_conversions; +use serde::{Deserialize, Serialize}; + +pub mod blobs; +pub mod tags; + +/// quic-rpc service for iroh blobs +#[derive(Debug, Clone)] +pub struct RpcService; + +impl quic_rpc::Service for RpcService { + type Req = Request; + type Res = Response; +} + +#[allow(missing_docs)] +#[enum_conversions] +#[derive(Debug, Serialize, Deserialize)] +pub enum Request { + Blobs(blobs::Request), + Tags(tags::Request), +} + +#[allow(missing_docs)] +#[enum_conversions] +#[derive(Debug, Serialize, Deserialize)] +pub enum Response { + Blobs(blobs::Response), + Tags(tags::Response), +} + +/// Error type for RPC operations +pub type RpcError = serde_error::Error; +/// Result type for RPC operations +pub type RpcResult = Result; diff --git a/src/rpc/proto/blobs.rs b/src/rpc/proto/blobs.rs new file mode 100644 index 000000000..2788027d6 --- /dev/null +++ b/src/rpc/proto/blobs.rs @@ -0,0 +1,319 @@ +//! RPC requests and responses for the blob service. +use std::path::PathBuf; + +use bytes::Bytes; +use iroh_base::hash::Hash; +use nested_enum_utils::enum_conversions; +use quic_rpc_derive::rpc_requests; +use serde::{Deserialize, Serialize}; + +use super::{RpcError, RpcResult, RpcService}; +use crate::{ + export::ExportProgress, + format::collection::Collection, + get::db::DownloadProgress, + net_protocol::{BatchId, BlobDownloadRequest}, + provider::{AddProgress, BatchAddPathProgress}, + rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption}, + store::{ + BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode, + ValidateProgress, + }, + util::SetTagOption, + BlobFormat, HashAndFormat, Tag, +}; + +#[allow(missing_docs)] +#[derive(strum::Display, Debug, Serialize, Deserialize)] +#[enum_conversions(super::Request)] +#[rpc_requests(RpcService)] +pub enum Request { + #[server_streaming(response = RpcResult)] + ReadAt(ReadAtRequest), + #[bidi_streaming(update = AddStreamUpdate, response = AddStreamResponse)] + AddStream(AddStreamRequest), + AddStreamUpdate(AddStreamUpdate), + #[server_streaming(response = AddPathResponse)] + AddPath(AddPathRequest), + #[server_streaming(response = DownloadResponse)] + Download(BlobDownloadRequest), + #[server_streaming(response = ExportResponse)] + Export(ExportRequest), + #[server_streaming(response = RpcResult)] + List(ListRequest), + #[server_streaming(response = RpcResult)] + ListIncomplete(ListIncompleteRequest), + #[rpc(response = RpcResult<()>)] + Delete(DeleteRequest), + #[server_streaming(response = ValidateProgress)] + Validate(ValidateRequest), + #[server_streaming(response = ConsistencyCheckProgress)] + Fsck(ConsistencyCheckRequest), + #[rpc(response = RpcResult)] + CreateCollection(CreateCollectionRequest), + #[rpc(response = RpcResult)] + BlobStatus(BlobStatusRequest), + + #[bidi_streaming(update = BatchUpdate, response = BatchCreateResponse)] + BatchCreate(BatchCreateRequest), + BatchUpdate(BatchUpdate), + #[bidi_streaming(update = BatchAddStreamUpdate, response = BatchAddStreamResponse)] + BatchAddStream(BatchAddStreamRequest), + BatchAddStreamUpdate(BatchAddStreamUpdate), + #[server_streaming(response = BatchAddPathResponse)] + BatchAddPath(BatchAddPathRequest), + #[rpc(response = RpcResult<()>)] + BatchCreateTempTag(BatchCreateTempTagRequest), +} + +#[allow(missing_docs)] +#[derive(strum::Display, Debug, Serialize, Deserialize)] +#[enum_conversions(super::Response)] +pub enum Response { + ReadAt(RpcResult), + AddStream(AddStreamResponse), + AddPath(AddPathResponse), + List(RpcResult), + ListIncomplete(RpcResult), + Download(DownloadResponse), + Fsck(ConsistencyCheckProgress), + Export(ExportResponse), + Validate(ValidateProgress), + CreateCollection(RpcResult), + BlobStatus(RpcResult), + BatchCreate(BatchCreateResponse), + BatchAddStream(BatchAddStreamResponse), + BatchAddPath(BatchAddPathResponse), +} + +/// A request to the node to provide the data at the given path +/// +/// Will produce a stream of [`AddProgress`] messages. +#[derive(Debug, Serialize, Deserialize)] +pub struct AddPathRequest { + /// The path to the data to provide. + /// + /// This should be an absolute path valid for the file system on which + /// the node runs. Usually the cli will run on the same machine as the + /// node, so this should be an absolute path on the cli machine. + pub path: PathBuf, + /// True if the provider can assume that the data will not change, so it + /// can be shared in place. + pub in_place: bool, + /// Tag to tag the data with. + pub tag: SetTagOption, + /// Whether to wrap the added data in a collection + pub wrap: WrapOption, +} + +/// Wrapper around [`AddProgress`]. +#[derive(Debug, Serialize, Deserialize, derive_more::Into)] +pub struct AddPathResponse(pub AddProgress); + +/// Progress response for [`BlobDownloadRequest`] +#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From, derive_more::Into)] +pub struct DownloadResponse(pub DownloadProgress); + +/// A request to the node to download and share the data specified by the hash. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExportRequest { + /// The hash of the blob to export. + pub hash: Hash, + /// The filepath to where the data should be saved + /// + /// This should be an absolute path valid for the file system on which + /// the node runs. + pub path: PathBuf, + /// Set to [`ExportFormat::Collection`] if the `hash` refers to a [`Collection`] and you want + /// to export all children of the collection into individual files. + pub format: ExportFormat, + /// The mode of exporting. + /// + /// The default is [`ExportMode::Copy`]. See [`ExportMode`] for details. + pub mode: ExportMode, +} + +/// Progress response for [`ExportRequest`] +#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From, derive_more::Into)] +pub struct ExportResponse(pub ExportProgress); + +/// A request to the node to validate the integrity of all provided data +#[derive(Debug, Serialize, Deserialize)] +pub struct ConsistencyCheckRequest { + /// repair the store by dropping inconsistent blobs + pub repair: bool, +} + +/// A request to the node to validate the integrity of all provided data +#[derive(Debug, Serialize, Deserialize)] +pub struct ValidateRequest { + /// repair the store by downgrading blobs from complete to partial + pub repair: bool, +} + +/// List all blobs, including collections +#[derive(Debug, Serialize, Deserialize)] +pub struct ListRequest; + +/// List all blobs, including collections +#[derive(Debug, Serialize, Deserialize)] +pub struct ListIncompleteRequest; + +/// Get the bytes for a hash +#[derive(Serialize, Deserialize, Debug)] +pub struct ReadAtRequest { + /// Hash to get bytes for + pub hash: Hash, + /// Offset to start reading at + pub offset: u64, + /// Length of the data to get + pub len: ReadAtLen, +} + +/// Response to [`ReadAtRequest`] +#[derive(Serialize, Deserialize, Debug)] +pub enum ReadAtResponse { + /// The entry header. + Entry { + /// The size of the blob + size: BaoBlobSize, + /// Whether the blob is complete + is_complete: bool, + }, + /// Chunks of entry data. + Data { + /// The data chunk + chunk: Bytes, + }, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct AddStreamRequest { + /// Tag to tag the data with. + pub tag: SetTagOption, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub enum AddStreamUpdate { + /// A chunk of stream data + Chunk(Bytes), + /// Abort the request due to an error on the client side + Abort, +} + +/// Wrapper around [`AddProgress`]. +#[derive(Debug, Serialize, Deserialize, derive_more::Into)] +pub struct AddStreamResponse(pub AddProgress); + +/// Delete a blob +#[derive(Debug, Serialize, Deserialize)] +pub struct DeleteRequest { + /// Name of the tag + pub hash: Hash, +} + +/// Create a collection. +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateCollectionRequest { + /// The collection + pub collection: Collection, + /// Tag option. + pub tag: SetTagOption, + /// Tags that should be deleted after creation. + pub tags_to_delete: Vec, +} + +/// A response to a create collection request +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateCollectionResponse { + /// The resulting hash. + pub hash: Hash, + /// The resulting tag. + pub tag: Tag, +} + +/// Request to get the status of a blob +#[derive(Debug, Serialize, Deserialize)] +pub struct BlobStatusRequest { + /// The hash of the blob + pub hash: Hash, +} + +/// The response to a status request +#[derive(Debug, Serialize, Deserialize, derive_more::From, derive_more::Into)] +pub struct BlobStatusResponse(pub BlobStatus); + +/// Request to create a new scope for temp tags +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchCreateRequest; + +/// Update to a temp tag scope +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchUpdate { + /// Drop of a remote temp tag + Drop(HashAndFormat), + /// Message to check that the connection is still alive + Ping, +} + +/// Response to a temp tag scope request +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchCreateResponse { + /// We got the id of the scope + Id(BatchId), +} + +/// Create a temp tag with a given hash and format +#[derive(Debug, Serialize, Deserialize)] +pub struct BatchCreateTempTagRequest { + /// Content to protect + pub content: HashAndFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddStreamRequest { + /// What format to use for the blob + pub format: BlobFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub enum BatchAddStreamUpdate { + /// A chunk of stream data + Chunk(Bytes), + /// Abort the request due to an error on the client side + Abort, +} + +/// Wrapper around [`AddProgress`]. +#[allow(missing_docs)] +#[derive(Debug, Serialize, Deserialize)] +pub enum BatchAddStreamResponse { + Abort(RpcError), + OutboardProgress { offset: u64 }, + Result { hash: Hash }, +} + +/// Write a blob from a byte stream +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathRequest { + /// The path to the data to provide. + pub path: PathBuf, + /// Add the data in place + pub import_mode: ImportMode, + /// What format to use for the blob + pub format: BlobFormat, + /// Batch to create the temp tag in + pub batch: BatchId, +} + +/// Response to a batch add path request +#[derive(Serialize, Deserialize, Debug)] +pub struct BatchAddPathResponse(pub BatchAddPathProgress); diff --git a/src/rpc/proto/tags.rs b/src/rpc/proto/tags.rs new file mode 100644 index 000000000..54d35f625 --- /dev/null +++ b/src/rpc/proto/tags.rs @@ -0,0 +1,109 @@ +//! Tags RPC protocol +use nested_enum_utils::enum_conversions; +use quic_rpc_derive::rpc_requests; +use serde::{Deserialize, Serialize}; + +use super::{RpcResult, RpcService}; +use crate::{net_protocol::BatchId, rpc::client::tags::TagInfo, HashAndFormat, Tag}; + +#[allow(missing_docs)] +#[derive(strum::Display, Debug, Serialize, Deserialize)] +#[enum_conversions(super::Request)] +#[rpc_requests(RpcService)] +pub enum Request { + #[rpc(response = RpcResult)] + Create(CreateRequest), + #[rpc(response = RpcResult<()>)] + Set(SetRequest), + #[rpc(response = RpcResult<()>)] + DeleteTag(DeleteRequest), + #[server_streaming(response = TagInfo)] + ListTags(ListRequest), +} + +#[allow(missing_docs)] +#[derive(strum::Display, Debug, Serialize, Deserialize)] +#[enum_conversions(super::Response)] +pub enum Response { + Create(RpcResult), + ListTags(TagInfo), + DeleteTag(RpcResult<()>), +} + +/// Determine how to sync the db after a modification operation +#[derive(Debug, Serialize, Deserialize, Default)] +pub enum SyncMode { + /// Fully sync the db + #[default] + Full, + /// Do not sync the db + None, +} + +/// Create a tag +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateRequest { + /// Value of the tag + pub value: HashAndFormat, + /// Batch to use, none for global + pub batch: Option, + /// Sync mode + pub sync: SyncMode, +} + +/// Set or delete a tag +#[derive(Debug, Serialize, Deserialize)] +pub struct SetRequest { + /// Name of the tag + pub name: Tag, + /// Value of the tag, None to delete + pub value: Option, + /// Batch to use, none for global + pub batch: Option, + /// Sync mode + pub sync: SyncMode, +} + +/// List all collections +/// +/// Lists all collections that have been explicitly added to the database. +#[derive(Debug, Serialize, Deserialize)] +pub struct ListRequest { + /// List raw tags + pub raw: bool, + /// List hash seq tags + pub hash_seq: bool, +} + +impl ListRequest { + /// List all tags + pub fn all() -> Self { + Self { + raw: true, + hash_seq: true, + } + } + + /// List raw tags + pub fn raw() -> Self { + Self { + raw: true, + hash_seq: false, + } + } + + /// List hash seq tags + pub fn hash_seq() -> Self { + Self { + raw: false, + hash_seq: true, + } + } +} + +/// Delete a tag +#[derive(Debug, Serialize, Deserialize)] +pub struct DeleteRequest { + /// Name of the tag + pub name: Tag, +} diff --git a/src/util.rs b/src/util.rs index 0fb2b1b1f..735a9feb1 100644 --- a/src/util.rs +++ b/src/util.rs @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize}; use crate::{BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE}; +pub mod fs; pub mod io; mod mem_or_file; pub mod progress; diff --git a/src/util/fs.rs b/src/util/fs.rs new file mode 100644 index 000000000..068ebadc9 --- /dev/null +++ b/src/util/fs.rs @@ -0,0 +1,447 @@ +//! Utilities for filesystem operations. +use std::{ + borrow::Cow, + fs::read_dir, + path::{Component, Path, PathBuf}, +}; + +use anyhow::{bail, Context}; +use bytes::Bytes; +/// A data source +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub struct DataSource { + /// Custom name + name: String, + /// Path to the file + path: PathBuf, +} + +impl DataSource { + /// Creates a new [`DataSource`] from a [`PathBuf`]. + pub fn new(path: PathBuf) -> Self { + let name = path + .file_name() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_default(); + DataSource { path, name } + } + /// Creates a new [`DataSource`] from a [`PathBuf`] and a custom name. + pub fn with_name(path: PathBuf, name: String) -> Self { + DataSource { path, name } + } + + /// Returns blob name for this data source. + /// + /// If no name was provided when created it is derived from the path name. + pub fn name(&self) -> Cow<'_, str> { + Cow::Borrowed(&self.name) + } + + /// Returns the path of this data source. + pub fn path(&self) -> &Path { + &self.path + } +} + +impl From for DataSource { + fn from(value: PathBuf) -> Self { + DataSource::new(value) + } +} + +impl From<&std::path::Path> for DataSource { + fn from(value: &std::path::Path) -> Self { + DataSource::new(value.to_path_buf()) + } +} + +/// Create data sources from a path. +#[cfg(feature = "rpc")] +pub fn scan_path( + path: PathBuf, + wrap: crate::rpc::client::blobs::WrapOption, +) -> anyhow::Result> { + use crate::rpc::client::blobs::WrapOption; + if path.is_dir() { + scan_dir(path, wrap) + } else { + let name = match wrap { + WrapOption::NoWrap => bail!("Cannot scan a file without wrapping"), + WrapOption::Wrap { name: None } => file_name(&path)?, + WrapOption::Wrap { name: Some(name) } => name, + }; + Ok(vec![DataSource { name, path }]) + } +} + +#[cfg(feature = "rpc")] +#[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))] +fn file_name(path: &Path) -> anyhow::Result { + relative_canonicalized_path_to_string(path.file_name().context("path is invalid")?) +} + +/// Create data sources from a directory. +#[cfg(feature = "rpc")] +#[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))] +pub fn scan_dir( + root: PathBuf, + wrap: crate::rpc::client::blobs::WrapOption, +) -> anyhow::Result> { + use crate::rpc::client::blobs::WrapOption; + if !root.is_dir() { + bail!("Expected {} to be a file", root.to_string_lossy()); + } + let prefix = match wrap { + WrapOption::NoWrap => None, + WrapOption::Wrap { name: None } => Some(file_name(&root)?), + WrapOption::Wrap { name: Some(name) } => Some(name), + }; + let files = walkdir::WalkDir::new(&root).into_iter(); + let data_sources = files + .map(|entry| { + let entry = entry?; + if !entry.file_type().is_file() { + // Skip symlinks. Directories are handled by WalkDir. + return Ok(None); + } + let path = entry.into_path(); + let mut name = relative_canonicalized_path_to_string(path.strip_prefix(&root)?)?; + if let Some(prefix) = &prefix { + name = format!("{prefix}/{name}"); + } + anyhow::Ok(Some(DataSource { name, path })) + }) + .filter_map(Result::transpose); + let data_sources: Vec> = data_sources.collect::>(); + data_sources.into_iter().collect::>>() +} + +/// This function converts a canonicalized relative path to a string, returning +/// an error if the path is not valid unicode. +/// +/// This function will also fail if the path is non canonical, i.e. contains +/// `..` or `.`, or if the path components contain any windows or unix path +/// separators. +pub fn relative_canonicalized_path_to_string(path: impl AsRef) -> anyhow::Result { + canonicalized_path_to_string(path, true) +} + +/// Loads a [`iroh_net::key::SecretKey`] from the provided file, or stores a newly generated one +/// at the given location. +#[cfg(feature = "rpc")] +#[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))] +pub async fn load_secret_key(key_path: PathBuf) -> anyhow::Result { + use tokio::io::AsyncWriteExt; + + if key_path.exists() { + let keystr = tokio::fs::read(key_path).await?; + let secret_key = + iroh_net::key::SecretKey::try_from_openssh(keystr).context("invalid keyfile")?; + Ok(secret_key) + } else { + let secret_key = iroh_net::key::SecretKey::generate(); + let ser_key = secret_key.to_openssh()?; + + // Try to canonicalize if possible + let key_path = key_path.canonicalize().unwrap_or(key_path); + let key_path_parent = key_path.parent().ok_or_else(|| { + anyhow::anyhow!("no parent directory found for '{}'", key_path.display()) + })?; + tokio::fs::create_dir_all(&key_path_parent).await?; + + // write to tempfile + let (file, temp_file_path) = tempfile::NamedTempFile::new_in(key_path_parent) + .context("unable to create tempfile")? + .into_parts(); + let mut file = tokio::fs::File::from_std(file); + file.write_all(ser_key.as_bytes()) + .await + .context("unable to write keyfile")?; + file.flush().await?; + drop(file); + + // move file + tokio::fs::rename(temp_file_path, key_path) + .await + .context("failed to rename keyfile")?; + + Ok(secret_key) + } +} + +/// Information about the content on a path +#[derive(Debug, Clone)] +pub struct PathContent { + /// total size of all the files in the directory + pub size: u64, + /// total number of files in the directory + pub files: u64, +} + +/// Walks the directory to get the total size and number of files in directory or file +/// +// TODO: possible combine with `scan_dir` +pub fn path_content_info(path: impl AsRef) -> anyhow::Result { + path_content_info0(path) +} + +fn path_content_info0(path: impl AsRef) -> anyhow::Result { + let mut files = 0; + let mut size = 0; + let path = path.as_ref(); + + if path.is_dir() { + for entry in read_dir(path)? { + let path0 = entry?.path(); + + match path_content_info0(path0) { + Ok(path_content) => { + size += path_content.size; + files += path_content.files; + } + Err(e) => bail!(e), + } + } + } else { + match path.try_exists() { + Ok(true) => { + size = path + .metadata() + .context(format!("Error reading metadata for {path:?}"))? + .len(); + files = 1; + } + Ok(false) => { + tracing::warn!("Not including broking symlink at {path:?}"); + } + Err(e) => { + bail!(e); + } + } + } + Ok(PathContent { size, files }) +} + +/// Helper function that translates a key that was derived from the [`path_to_key`] function back +/// into a path. +/// +/// If `prefix` exists, it will be stripped before converting back to a path +/// If `root` exists, will add the root as a parent to the created path +/// Removes any null byte that has been appended to the key +pub fn key_to_path( + key: impl AsRef<[u8]>, + prefix: Option, + root: Option, +) -> anyhow::Result { + let mut key = key.as_ref(); + if key.is_empty() { + return Ok(PathBuf::new()); + } + // if the last element is the null byte, remove it + if b'\0' == key[key.len() - 1] { + key = &key[..key.len() - 1] + } + + let key = if let Some(prefix) = prefix { + let prefix = prefix.into_bytes(); + if prefix[..] == key[..prefix.len()] { + &key[prefix.len()..] + } else { + anyhow::bail!("key {:?} does not begin with prefix {:?}", key, prefix); + } + } else { + key + }; + + let mut path = if key[0] == b'/' { + PathBuf::from("/") + } else { + PathBuf::new() + }; + for component in key + .split(|c| c == &b'/') + .map(|c| String::from_utf8(c.into()).context("key contains invalid data")) + { + let component = component?; + path = path.join(component); + } + + // add root if it exists + let path = if let Some(root) = root { + root.join(path) + } else { + path + }; + + Ok(path) +} + +/// Helper function that creates a document key from a canonicalized path, removing the `root` and adding the `prefix`, if they exist +/// +/// Appends the null byte to the end of the key. +pub fn path_to_key( + path: impl AsRef, + prefix: Option, + root: Option, +) -> anyhow::Result { + let path = path.as_ref(); + let path = if let Some(root) = root { + path.strip_prefix(root)? + } else { + path + }; + let suffix = canonicalized_path_to_string(path, false)?.into_bytes(); + let mut key = if let Some(prefix) = prefix { + prefix.into_bytes().to_vec() + } else { + Vec::new() + }; + key.extend(suffix); + key.push(b'\0'); + Ok(key.into()) +} + +/// This function converts an already canonicalized path to a string. +/// +/// If `must_be_relative` is true, the function will fail if any component of the path is +/// `Component::RootDir` +/// +/// This function will also fail if the path is non canonical, i.e. contains +/// `..` or `.`, or if the path components contain any windows or unix path +/// separators. +pub fn canonicalized_path_to_string( + path: impl AsRef, + must_be_relative: bool, +) -> anyhow::Result { + let mut path_str = String::new(); + let parts = path + .as_ref() + .components() + .filter_map(|c| match c { + Component::Normal(x) => { + let c = match x.to_str() { + Some(c) => c, + None => return Some(Err(anyhow::anyhow!("invalid character in path"))), + }; + + if !c.contains('/') && !c.contains('\\') { + Some(Ok(c)) + } else { + Some(Err(anyhow::anyhow!("invalid path component {:?}", c))) + } + } + Component::RootDir => { + if must_be_relative { + Some(Err(anyhow::anyhow!("invalid path component {:?}", c))) + } else { + path_str.push('/'); + None + } + } + _ => Some(Err(anyhow::anyhow!("invalid path component {:?}", c))), + }) + .collect::>>()?; + let parts = parts.join("/"); + path_str.push_str(&parts); + Ok(path_str) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_path_to_key_roundtrip() { + let path = PathBuf::from("/foo/bar"); + let expect_path = PathBuf::from("/foo/bar"); + let key = b"/foo/bar\0"; + let expect_key = Bytes::from(&key[..]); + + let got_key = path_to_key(path.clone(), None, None).unwrap(); + let got_path = key_to_path(got_key.clone(), None, None).unwrap(); + + assert_eq!(expect_key, got_key); + assert_eq!(expect_path, got_path); + + // including prefix + let prefix = String::from("prefix:"); + let key = b"prefix:/foo/bar\0"; + let expect_key = Bytes::from(&key[..]); + let got_key = path_to_key(path.clone(), Some(prefix.clone()), None).unwrap(); + assert_eq!(expect_key, got_key); + let got_path = key_to_path(got_key, Some(prefix.clone()), None).unwrap(); + assert_eq!(expect_path, got_path); + + // including root + let root = PathBuf::from("/foo"); + let key = b"prefix:bar\0"; + let expect_key = Bytes::from(&key[..]); + let got_key = path_to_key(path, Some(prefix.clone()), Some(root.clone())).unwrap(); + assert_eq!(expect_key, got_key); + let got_path = key_to_path(got_key, Some(prefix), Some(root)).unwrap(); + assert_eq!(expect_path, got_path); + } + + #[test] + fn test_canonicalized_path_to_string() { + assert_eq!( + canonicalized_path_to_string("foo/bar", true).unwrap(), + "foo/bar" + ); + assert_eq!(canonicalized_path_to_string("", true).unwrap(), ""); + assert_eq!( + canonicalized_path_to_string("foo bar/baz/bat", true).unwrap(), + "foo bar/baz/bat" + ); + assert_eq!( + canonicalized_path_to_string("/foo/bar", true).map_err(|e| e.to_string()), + Err("invalid path component RootDir".to_string()) + ); + + assert_eq!( + canonicalized_path_to_string("/foo/bar", false).unwrap(), + "/foo/bar" + ); + let path = PathBuf::from("/").join("Ü").join("⁰€™■・�").join("東京"); + assert_eq!( + canonicalized_path_to_string(path, false).unwrap(), + "/Ü/⁰€™■・�/東京" + ) + } + + #[test] + fn test_get_path_content() { + let dir = testdir::testdir!(); + let PathContent { size, files } = path_content_info(&dir).unwrap(); + assert_eq!(0, size); + assert_eq!(0, files); + let foo = b"hello_world"; + let bar = b"ipsum lorem"; + let bat = b"happy birthday"; + let expect_size = foo.len() + bar.len() + bat.len(); + std::fs::write(dir.join("foo.txt"), foo).unwrap(); + std::fs::write(dir.join("bar.txt"), bar).unwrap(); + std::fs::write(dir.join("bat.txt"), bat).unwrap(); + let PathContent { size, files } = path_content_info(&dir).unwrap(); + assert_eq!(expect_size as u64, size); + assert_eq!(3, files); + + // create nested empty dirs + std::fs::create_dir(dir.join("1")).unwrap(); + std::fs::create_dir(dir.join("2")).unwrap(); + let dir3 = dir.join("3"); + std::fs::create_dir(&dir3).unwrap(); + + // create a nested dir w/ content + let dir4 = dir3.join("4"); + std::fs::create_dir(&dir4).unwrap(); + std::fs::write(dir4.join("foo.txt"), foo).unwrap(); + std::fs::write(dir4.join("bar.txt"), bar).unwrap(); + std::fs::write(dir4.join("bat.txt"), bat).unwrap(); + + let expect_size = expect_size * 2; + let PathContent { size, files } = path_content_info(&dir).unwrap(); + assert_eq!(expect_size as u64, size); + assert_eq!(6, files); + } +}