From 048538f4827a149bfa0044198c1d4c502aa3836b Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 25 Nov 2024 12:11:20 +0200 Subject: [PATCH 1/7] Add garbage collection to blobs also add ability to add excepmtions for gc before gc is started. --- src/net_protocol.rs | 119 ++++++++++++++++++++++++-------------------- 1 file changed, 66 insertions(+), 53 deletions(-) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index c9ef43606..894dacef7 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -4,13 +4,15 @@ #![allow(missing_docs)] use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, fmt::Debug, + ops::DerefMut, sync::{Arc, OnceLock}, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use futures_lite::future::Boxed as BoxedFuture; +use futures_util::future::BoxFuture; use iroh_base::hash::{BlobFormat, Hash}; use iroh_net::{endpoint::Connecting, Endpoint, NodeAddr}; use iroh_router::ProtocolHandler; @@ -24,27 +26,32 @@ use crate::{ Stats, }, provider::EventSender, + store::GcConfig, util::{ - local_pool::LocalPoolHandle, + local_pool::{self, LocalPoolHandle}, progress::{AsyncChannelProgressSender, ProgressSender}, SetTagOption, }, HashAndFormat, TempTag, }; -// pub type ProtectCb = Box) -> BoxFuture<()> + Send + Sync>; -// -// #[derive(derive_more::Debug)] -// enum GcState { -// Initial(#[debug(skip)] Vec), -// Started(#[allow(dead_code)] Option>), -// } -// -// impl Default for GcState { -// fn default() -> Self { -// Self::Initial(Vec::new()) -// } -// } +/// A callback that blobs can ask about a set of hashes that should not be garbage collected. +pub type ProtectCb = Box) -> BoxFuture<()> + Send + Sync>; + +/// The state of the gc loop. +#[derive(derive_more::Debug)] +enum GcState { + // Gc loop is not yet running. Other protcols can add protect callbacks + Initial(#[debug(skip)] Vec), + // Gc loop is running. No more protect callbacks can be added. + Started(#[allow(dead_code)] Option>), +} + +impl Default for GcState { + fn default() -> Self { + Self::Initial(Vec::new()) + } +} #[derive(Debug)] pub struct Blobs { @@ -54,6 +61,7 @@ pub struct Blobs { downloader: Downloader, batches: tokio::sync::Mutex, endpoint: Endpoint, + gc_state: Arc>, #[cfg(feature = "rpc")] pub(crate) rpc_handler: Arc>, } @@ -185,6 +193,7 @@ impl Blobs { downloader, endpoint, batches: Default::default(), + gc_state: Default::default(), #[cfg(feature = "rpc")] rpc_handler: Arc::new(OnceLock::new()), } @@ -206,43 +215,47 @@ impl Blobs { &self.endpoint } - // pub fn add_protected(&self, cb: ProtectCb) -> Result<()> { - // let mut state = self.gc_state.lock().unwrap(); - // match &mut *state { - // GcState::Initial(cbs) => { - // cbs.push(cb); - // } - // GcState::Started(_) => { - // anyhow::bail!("cannot add protected blobs after gc has started"); - // } - // } - // Ok(()) - // } - // - // pub fn start_gc(&self, config: GcConfig) -> Result<()> { - // let mut state = self.gc_state.lock().unwrap(); - // let protected = match state.deref_mut() { - // GcState::Initial(items) => std::mem::take(items), - // GcState::Started(_) => anyhow::bail!("gc already started"), - // }; - // let protected = Arc::new(protected); - // let protected_cb = move || { - // let protected = protected.clone(); - // async move { - // let mut set = BTreeSet::new(); - // for cb in protected.iter() { - // cb(&mut set).await; - // } - // set - // } - // }; - // let store = self.store.clone(); - // let run = self - // .rt - // .spawn(move || async move { store.gc_run(config, protected_cb).await }); - // *state = GcState::Started(Some(run)); - // Ok(()) - // } + /// Add a callback that will be called before the garbage collector runs. + /// + /// This can only be called before the garbage collector has started, otherwise it will return an error. + pub fn add_protected(&self, cb: ProtectCb) -> Result<()> { + let mut state = self.gc_state.lock().unwrap(); + match &mut *state { + GcState::Initial(cbs) => { + cbs.push(cb); + } + GcState::Started(_) => { + anyhow::bail!("cannot add protected blobs after gc has started"); + } + } + Ok(()) + } + + /// Start garbage collection with the given settings. + pub fn start_gc(&self, config: GcConfig) -> Result<()> { + let mut state = self.gc_state.lock().unwrap(); + let protected = match state.deref_mut() { + GcState::Initial(items) => std::mem::take(items), + GcState::Started(_) => bail!("gc already started"), + }; + let protected = Arc::new(protected); + let protected_cb = move || { + let protected = protected.clone(); + async move { + let mut set = BTreeSet::new(); + for cb in protected.iter() { + cb(&mut set).await; + } + set + } + }; + let store = self.store.clone(); + let run = self + .rt + .spawn(move || async move { store.gc_run(config, protected_cb).await }); + *state = GcState::Started(Some(run)); + Ok(()) + } pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> { self.batches.lock().await From 627b1dbf18f9a40516a89d7670714f7adfd5c205 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 25 Nov 2024 12:26:06 +0200 Subject: [PATCH 2/7] rustls cargo deny fix --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c9433ddc..cd0066036 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3963,9 +3963,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.17" +version = "0.23.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f1a745511c54ba6d4465e8d5dfbd81b45791756de28d4981af70d6dca128f1e" +checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" dependencies = [ "log", "once_cell", From cdec081d21c37614a57af8007879a732bb3e3a90 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 25 Nov 2024 12:27:30 +0200 Subject: [PATCH 3/7] codespell --- src/net_protocol.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 894dacef7..c2592cf20 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -41,7 +41,7 @@ pub type ProtectCb = Box) -> BoxFuture<()> + Send + S /// The state of the gc loop. #[derive(derive_more::Debug)] enum GcState { - // Gc loop is not yet running. Other protcols can add protect callbacks + // Gc loop is not yet running. Other protocols can add protect callbacks Initial(#[debug(skip)] Vec), // Gc loop is running. No more protect callbacks can be added. Started(#[allow(dead_code)] Option>), From 08cc84061426a1d4d014a336038d946eac7e551b Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 26 Nov 2024 14:43:18 +0200 Subject: [PATCH 4/7] Add some very basic gc tests --- tests/blobs.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 tests/blobs.rs diff --git a/tests/blobs.rs b/tests/blobs.rs new file mode 100644 index 000000000..aed641f39 --- /dev/null +++ b/tests/blobs.rs @@ -0,0 +1,70 @@ +#![cfg(feature = "net_protocol")] +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use iroh_blobs::{net_protocol::Blobs, store::GcConfig, util::local_pool::LocalPool, Hash}; +use iroh_net::Endpoint; +use testresult::TestResult; + +#[tokio::test] +async fn blobs_gc_smoke() -> TestResult<()> { + let pool = LocalPool::default(); + let endpoint = Endpoint::builder().bind().await?; + let blobs = Blobs::memory().build(pool.handle(), &endpoint); + let client = blobs.clone().client(); + blobs.start_gc(GcConfig { + period: Duration::from_millis(1), + done_callback: None, + })?; + let h1 = client.add_bytes(b"test".to_vec()).await?; + tokio::time::sleep(Duration::from_millis(100)).await; + assert!(client.has(h1.hash).await?); + client.tags().delete(h1.tag).await?; + tokio::time::sleep(Duration::from_millis(100)).await; + assert!(!client.has(h1.hash).await?); + Ok(()) +} + +#[tokio::test] +async fn blobs_gc_protected() -> TestResult<()> { + let pool = LocalPool::default(); + let endpoint = Endpoint::builder().bind().await?; + let blobs = Blobs::memory().build(pool.handle(), &endpoint); + let client: iroh_blobs::rpc::client::blobs::Client< + quic_rpc::transport::flume::FlumeConnector< + iroh_blobs::rpc::proto::Response, + iroh_blobs::rpc::proto::Request, + >, + > = blobs.clone().client(); + let h1 = client.add_bytes(b"test".to_vec()).await?; + let protected: Arc>> = Arc::new(Mutex::new(Vec::new())); + let protected2 = protected.clone(); + blobs.add_protected(Box::new(move |x| { + let protected = protected2.clone(); + Box::pin(async move { + let protected = protected.lock().unwrap(); + for h in protected.as_slice() { + x.insert(*h); + } + }) + }))?; + blobs.start_gc(GcConfig { + period: Duration::from_millis(1), + done_callback: None, + })?; + tokio::time::sleep(Duration::from_millis(100)).await; + // protected from gc due to tag + assert!(client.has(h1.hash).await?); + client.tags().delete(h1.tag).await?; + protected.lock().unwrap().push(h1.hash); + tokio::time::sleep(Duration::from_millis(100)).await; + // protected from gc due to being in protected set + assert!(client.has(h1.hash).await?); + protected.lock().unwrap().clear(); + tokio::time::sleep(Duration::from_millis(100)).await; + // not protected, must be gone + assert!(!client.has(h1.hash).await?); + Ok(()) +} From e7943c241f20efc2f6aa122167d1537e792d5986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 28 Nov 2024 10:46:06 +0200 Subject: [PATCH 5/7] Update tests/blobs.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit different way to deal with the clone madness Co-authored-by: Philipp Krüger --- tests/blobs.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/blobs.rs b/tests/blobs.rs index aed641f39..c71134a31 100644 --- a/tests/blobs.rs +++ b/tests/blobs.rs @@ -39,16 +39,17 @@ async fn blobs_gc_protected() -> TestResult<()> { >, > = blobs.clone().client(); let h1 = client.add_bytes(b"test".to_vec()).await?; - let protected: Arc>> = Arc::new(Mutex::new(Vec::new())); - let protected2 = protected.clone(); - blobs.add_protected(Box::new(move |x| { - let protected = protected2.clone(); - Box::pin(async move { - let protected = protected.lock().unwrap(); - for h in protected.as_slice() { - x.insert(*h); - } - }) + blobs.add_protected(Box::new({ + let protected = protected.clone(); + move |x| { + let protected = protected.clone(); + Box::pin(async move { + let protected = protected.lock().unwrap(); + for h in protected.as_slice() { + x.insert(*h); + } + }) + } }))?; blobs.start_gc(GcConfig { period: Duration::from_millis(1), From ca2b9a07bcdf7d32fed3a8e516b820113a774272 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 28 Nov 2024 12:31:21 +0200 Subject: [PATCH 6/7] fix weird compile error - probably due to merge conflict --- tests/blobs.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/blobs.rs b/tests/blobs.rs index c71134a31..6e93d40ba 100644 --- a/tests/blobs.rs +++ b/tests/blobs.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use iroh_blobs::{net_protocol::Blobs, store::GcConfig, util::local_pool::LocalPool, Hash}; +use iroh_blobs::{net_protocol::Blobs, store::GcConfig, util::local_pool::LocalPool}; use iroh_net::Endpoint; use testresult::TestResult; @@ -39,6 +39,7 @@ async fn blobs_gc_protected() -> TestResult<()> { >, > = blobs.clone().client(); let h1 = client.add_bytes(b"test".to_vec()).await?; + let protected = Arc::new(Mutex::new(Vec::new())); blobs.add_protected(Box::new({ let protected = protected.clone(); move |x| { From 715d66873efda7d0581b5d34ff3b9c3162dc11fb Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 28 Nov 2024 15:07:38 +0200 Subject: [PATCH 7/7] fix blobs test first protect, then delete tag --- tests/blobs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/blobs.rs b/tests/blobs.rs index 6e93d40ba..4ec9ed590 100644 --- a/tests/blobs.rs +++ b/tests/blobs.rs @@ -59,8 +59,8 @@ async fn blobs_gc_protected() -> TestResult<()> { tokio::time::sleep(Duration::from_millis(100)).await; // protected from gc due to tag assert!(client.has(h1.hash).await?); - client.tags().delete(h1.tag).await?; protected.lock().unwrap().push(h1.hash); + client.tags().delete(h1.tag).await?; tokio::time::sleep(Duration::from_millis(100)).await; // protected from gc due to being in protected set assert!(client.has(h1.hash).await?);