diff --git a/src/api.rs b/src/api.rs index 94f34ade..f3c540cd 100644 --- a/src/api.rs +++ b/src/api.rs @@ -4,6 +4,7 @@ //! with a remote store via rpc calls. use std::{io, net::SocketAddr, ops::Deref, sync::Arc}; +use bao_tree::io::EncodeError; use iroh::Endpoint; use irpc::rpc::{listen, Handler}; use n0_snafu::SpanTrace; @@ -211,6 +212,15 @@ impl std::error::Error for Error { } } +impl From for Error { + fn from(value: EncodeError) -> Self { + match value { + EncodeError::Io(cause) => Self::Io(cause), + _ => Self::other(value), + } + } +} + pub type Result = std::result::Result; /// The main entry point for the store API. diff --git a/src/api/blobs.rs b/src/api/blobs.rs index 942daf0a..312f45d1 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -102,13 +102,21 @@ impl Blobs { }) } - pub async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> { + /// Delete a blob. + /// + /// This function is not public, because it does not work as expected when called manually, + /// because blobs are protected from deletion. This is only called from the gc task, which + /// clears the protections before. + /// + /// Users should rely only on garbage collection for blob deletion. + pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> { trace!("{options:?}"); self.client.rpc(options).await??; Ok(()) } - pub async fn delete( + /// See [`Self::delete_with_opts`]. + pub(crate) async fn delete( &self, hashes: impl IntoIterator>, ) -> RequestResult<()> { @@ -294,6 +302,7 @@ impl Blobs { ExportBaoRequest { hash: hash.into(), ranges: ranges.into(), + create_if_missing: false, }, 32, ) @@ -962,7 +971,6 @@ impl ExportBaoProgress { let mut data = Vec::new(); let mut stream = self.into_byte_stream(); while let Some(item) = stream.next().await { - println!("item: {item:?}"); data.extend_from_slice(&item?); } Ok(data) @@ -1088,7 +1096,7 @@ impl ExportBaoProgress { } EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)), EncodedItem::Done => None, - EncodedItem::Error(cause) => Some(Err(super::Error::other(cause))), + EncodedItem::Error(cause) => Some(Err(cause.into())), }) } diff --git a/src/api/proto.rs b/src/api/proto.rs index ed3686e1..4ec0ec93 100644 --- a/src/api/proto.rs +++ b/src/api/proto.rs @@ -222,6 +222,7 @@ pub struct ObserveRequest { pub struct ExportBaoRequest { pub hash: Hash, pub ranges: ChunkRanges, + pub create_if_missing: bool, } /// Export the given ranges as chunkks, without validation. diff --git a/src/api/remote.rs b/src/api/remote.rs index 75e66d0e..cbacb4c1 100644 --- a/src/api/remote.rs +++ b/src/api/remote.rs @@ -10,7 +10,7 @@ use nested_enum_utils::common_fields; use ref_cast::RefCast; use snafu::{Backtrace, IntoError, Snafu}; -use super::blobs::Bitfield; +use super::blobs::{Bitfield, ExportBaoOptions}; use crate::{ api::{blobs::WriteProgress, ApiClient}, get::{fsm::DecodeError, BadRequestSnafu, GetError, GetResult, LocalFailureSnafu, Stats}, @@ -159,7 +159,7 @@ impl PushProgress { async fn just_result(stream: S) -> Option where - S: Stream, + S: Stream, R: TryFrom, { tokio::pin!(stream); @@ -417,7 +417,12 @@ impl Remote { let root = request.hash; let bitfield = self.store().observe(root).await?; let children = if !request.ranges.is_blob() { - let bao = self.store().export_bao(root, bitfield.ranges.clone()); + let opts = ExportBaoOptions { + hash: root, + ranges: bitfield.ranges.clone(), + create_if_missing: true, + }; + let bao = self.store().export_bao_with_opts(opts, 32); let mut by_index = BTreeMap::new(); let mut stream = bao.hashes_with_index(); while let Some(item) = stream.next().await { diff --git a/src/store/fs.rs b/src/store/fs.rs index ff8bc900..41cd82ba 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -305,6 +305,34 @@ impl HashContext { Ok(()) } + pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result { + if create { + self.get_or_create(hash).await + } else { + self.get(hash).await + } + } + + pub async fn get(&self, hash: Hash) -> api::Result { + if hash == Hash::EMPTY { + return Ok(self.ctx.empty.clone()); + } + let res = self + .slot + .get_or_create(|| async { + let res = self.db().get(hash).await.map_err(io::Error::other)?; + let res = match res { + Some(state) => open_bao_file(&hash, state, &self.ctx).await, + None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")), + }; + Ok((res?, ())) + }) + .await + .map_err(api::Error::from); + let (res, _) = res?; + Ok(res) + } + pub async fn get_or_create(&self, hash: Hash) -> api::Result { if hash == Hash::EMPTY { return Ok(self.ctx.empty.clone()); @@ -939,7 +967,7 @@ async fn observe(cmd: ObserveMsg, ctx: HashContext) { #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) { - match ctx.get_or_create(cmd.hash).await { + match ctx.get(cmd.hash).await { Ok(handle) => { if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await { cmd.tx @@ -1000,7 +1028,7 @@ async fn export_ranges_impl( #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) { - match ctx.get_or_create(cmd.hash).await { + match ctx.get_maybe_create(cmd.hash, cmd.create_if_missing).await { Ok(handle) => { if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await { cmd.tx @@ -1010,9 +1038,9 @@ async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) { } } Err(cause) => { - let cause = anyhow::anyhow!("failed to open file: {cause}"); + let crate::api::Error::Io(cause) = cause; cmd.tx - .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into()) + .send(bao_tree::io::EncodeError::Io(cause).into()) .await .ok(); } @@ -1024,7 +1052,7 @@ async fn export_bao_impl( tx: &mut mpsc::Sender, handle: BaoFileHandle, ) -> anyhow::Result<()> { - let ExportBaoRequest { ranges, hash } = cmd; + let ExportBaoRequest { ranges, hash, .. } = cmd; debug_assert!(handle.hash() == hash, "hash mismatch"); let outboard = handle.outboard()?; let size = outboard.tree.size(); diff --git a/src/store/fs/gc.rs b/src/store/fs/gc.rs index 4aa16c14..df272dbb 100644 --- a/src/store/fs/gc.rs +++ b/src/store/fs/gc.rs @@ -1,9 +1,9 @@ -use std::collections::HashSet; +use std::{collections::HashSet, pin::Pin, sync::Arc}; use bao_tree::ChunkRanges; use genawaiter::sync::{Co, Gen}; use n0_future::{Stream, StreamExt}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use crate::{api::Store, Hash, HashAndFormat}; @@ -130,14 +130,52 @@ fn gc_sweep<'a>( }) } -#[derive(Debug, Clone)] +/// Configuration for garbage collection. +#[derive(derive_more::Debug, Clone)] pub struct GcConfig { + /// Interval in which to run garbage collection. pub interval: std::time::Duration, + /// Optional callback to manually add protected blobs. + /// + /// The callback is called before each garbage collection run. It gets a `&mut HashSet` + /// and returns a future that returns [`ProtectOutcome`]. All hashes that are added to the + /// [`HashSet`] will be protected from garbage collection during this run. + /// + /// In normal operation, return [`ProtectOutcome::Continue`] from the callback. If you return + /// [`ProtectOutcome::Abort`], the garbage collection run will be aborted.Use this if your + /// source of hashes to protect returned an error, and thus garbage collection should be skipped + /// completely to not unintentionally delete blobs that should be protected. + #[debug("ProtectCallback")] + pub add_protected: Option, } +/// Returned from [`ProtectCb`]. +/// +/// See [`GcConfig::add_protected] for details. +#[derive(Debug)] +pub enum ProtectOutcome { + /// Continue with the garbage collection run. + Continue, + /// Abort the garbage collection run. + Abort, +} + +/// The type of the garbage collection callback. +/// +/// See [`GcConfig::add_protected] for details. +pub type ProtectCb = Arc< + dyn for<'a> Fn( + &'a mut HashSet, + ) + -> Pin + Send + Sync + 'a>> + + Send + + Sync + + 'static, +>; + pub async fn gc_run_once(store: &Store, live: &mut HashSet) -> crate::api::Result<()> { + debug!(externally_protected = live.len(), "gc: start"); { - live.clear(); store.clear_protected().await?; let mut stream = gc_mark(store, live); while let Some(ev) = stream.next().await { @@ -155,6 +193,7 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet) -> crate::api: } } } + debug!(total_protected = live.len(), "gc: sweep"); { let mut stream = gc_sweep(store, live); while let Some(ev) = stream.next().await { @@ -172,14 +211,26 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet) -> crate::api: } } } + debug!("gc: done"); Ok(()) } pub async fn run_gc(store: Store, config: GcConfig) { + debug!("gc enabled with interval {:?}", config.interval); let mut live = HashSet::new(); loop { + live.clear(); tokio::time::sleep(config.interval).await; + if let Some(ref cb) = config.add_protected { + match (cb)(&mut live).await { + ProtectOutcome::Continue => {} + ProtectOutcome::Abort => { + info!("abort gc run: protect callback indicated abort"); + continue; + } + } + } if let Err(e) = gc_run_once(&store, &mut live).await { error!("error during gc run: {e}"); break; @@ -189,14 +240,18 @@ pub async fn run_gc(store: Store, config: GcConfig) { #[cfg(test)] mod tests { - use std::path::Path; + use std::{ + io::{self}, + path::Path, + }; - use bao_tree::ChunkNum; + use bao_tree::{io::EncodeError, ChunkNum}; + use range_collections::RangeSet2; use testresult::TestResult; use super::*; use crate::{ - api::{blobs::AddBytesOptions, Store}, + api::{blobs::AddBytesOptions, ExportBaoError, RequestError, Store}, hashseq::HashSeq, store::fs::{options::PathOptions, tests::create_n0_bao}, BlobFormat, @@ -284,6 +339,7 @@ mod tests { assert!(!data_path.exists()); assert!(!outboard_path.exists()); } + live.clear(); // create a large partial file and check that the data and outboard file as well as // the sizes and bitfield files are deleted by gc { @@ -326,4 +382,83 @@ mod tests { gc_smoke(&store).await?; Ok(()) } + + #[tokio::test] + async fn gc_check_deletion_fs() -> TestResult { + tracing_subscriber::fmt::try_init().ok(); + let testdir = tempfile::tempdir()?; + let db_path = testdir.path().join("db"); + let store = crate::store::fs::FsStore::load(&db_path).await?; + gc_check_deletion(&store).await + } + + #[tokio::test] + async fn gc_check_deletion_mem() -> TestResult { + tracing_subscriber::fmt::try_init().ok(); + let store = crate::store::mem::MemStore::default(); + gc_check_deletion(&store).await + } + + async fn gc_check_deletion(store: &Store) -> TestResult { + let temp_tag = store.add_bytes(b"foo".to_vec()).temp_tag().await?; + let hash = *temp_tag.hash(); + assert_eq!(store.get_bytes(hash).await?.as_ref(), b"foo"); + drop(temp_tag); + let mut live = HashSet::new(); + gc_run_once(store, &mut live).await?; + + // check that `get_bytes` returns an error. + let res = store.get_bytes(hash).await; + assert!(res.is_err()); + assert!(matches!( + res, + Err(ExportBaoError::ExportBaoInner { + source: EncodeError::Io(cause), + .. + }) if cause.kind() == io::ErrorKind::NotFound + )); + + // check that `export_ranges` returns an error. + let res = store + .export_ranges(hash, RangeSet2::all()) + .concatenate() + .await; + assert!(res.is_err()); + assert!(matches!( + res, + Err(RequestError::Inner{ + source: crate::api::Error::Io(cause), + .. + }) if cause.kind() == io::ErrorKind::NotFound + )); + + // check that `export_bao` returns an error. + let res = store + .export_bao(hash, ChunkRanges::all()) + .bao_to_vec() + .await; + assert!(res.is_err()); + println!("export_bao res {res:?}"); + assert!(matches!( + res, + Err(RequestError::Inner{ + source: crate::api::Error::Io(cause), + .. + }) if cause.kind() == io::ErrorKind::NotFound + )); + + // check that `export` returns an error. + let target = tempfile::NamedTempFile::new()?; + let path = target.path(); + let res = store.export(hash, path).await; + assert!(res.is_err()); + assert!(matches!( + res, + Err(RequestError::Inner{ + source: crate::api::Error::Io(cause), + .. + }) if cause.kind() == io::ErrorKind::NotFound + )); + Ok(()) + } } diff --git a/src/store/fs/meta.rs b/src/store/fs/meta.rs index a0831410..617db98c 100644 --- a/src/store/fs/meta.rs +++ b/src/store/fs/meta.rs @@ -463,6 +463,7 @@ impl Actor { } = cmd; for hash in hashes { if !force && protected.contains(&hash) { + trace!("delete {hash}: skip (protected)"); continue; } if let Some(entry) = tables.blobs.remove(hash).context(StorageSnafu)? { @@ -471,6 +472,7 @@ impl Actor { data_location, outboard_location, } => { + trace!("delete {hash}: currently complete. will be deleted."); match data_location { DataLocation::Inline(_) => { tables.inline_data.remove(hash).context(StorageSnafu)?; @@ -493,6 +495,7 @@ impl Actor { } } EntryState::Partial { .. } => { + trace!("delete {hash}: currently partial. will be deleted."); tables.ftx.delete( hash, [ diff --git a/src/store/fs/options.rs b/src/store/fs/options.rs index 6e123b75..f7dfa82f 100644 --- a/src/store/fs/options.rs +++ b/src/store/fs/options.rs @@ -4,7 +4,8 @@ use std::{ time::Duration, }; -use super::{gc::GcConfig, meta::raw_outboard_size, temp_name}; +pub use super::gc::{GcConfig, ProtectCb, ProtectOutcome}; +use super::{meta::raw_outboard_size, temp_name}; use crate::Hash; /// Options for directories used by the file store. diff --git a/src/store/mem.rs b/src/store/mem.rs index 94033eee..c4e07111 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -23,7 +23,7 @@ use bao_tree::{ mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt}, outboard::PreOrderMemOutboard, sync::{Outboard, ReadAt, WriteAt}, - BaoContentItem, Leaf, + BaoContentItem, EncodeError, Leaf, }, BaoTree, ChunkNum, ChunkRanges, TreeNode, }; @@ -117,6 +117,7 @@ impl MemStore { state: State { data: HashMap::new(), tags: BTreeMap::new(), + empty_hash: BaoFileHandle::new_partial(Hash::EMPTY), }, options: Arc::new(Options::default()), temp_tags: Default::default(), @@ -189,15 +190,24 @@ impl Actor { self.spawn(import_path(cmd)); } Command::ExportBao(ExportBaoMsg { - inner: ExportBaoRequest { hash, ranges }, + inner: + ExportBaoRequest { + hash, + ranges, + create_if_missing, + }, tx, .. }) => { - let entry = self.get_or_create_entry(hash); - self.spawn(export_bao(entry, ranges, tx)); + let entry = if create_if_missing { + Some(self.get_or_create_entry(hash)) + } else { + self.get(&hash) + }; + self.spawn(export_bao(entry, ranges, tx)) } Command::ExportPath(cmd) => { - let entry = self.state.data.get(&cmd.hash).cloned(); + let entry = self.get(&cmd.hash); self.spawn(export_path(entry, cmd)); } Command::DeleteTags(cmd) => { @@ -329,7 +339,7 @@ impl Actor { tx, .. } = cmd; - let res = match self.state.data.get(&hash) { + let res = match self.get(&hash) { None => api::blobs::BlobStatus::NotFound, Some(x) => { let bitfield = x.0.state.borrow().bitfield(); @@ -371,8 +381,8 @@ impl Actor { cmd.tx.send(Ok(())).await.ok(); } Command::ExportRanges(cmd) => { - let entry = self.get_or_create_entry(cmd.hash); - self.spawn(export_ranges(cmd, entry.clone())); + let entry = self.get(&cmd.hash); + self.spawn(export_ranges(cmd, entry)); } Command::SyncDb(SyncDbMsg { tx, .. }) => { tx.send(Ok(())).await.ok(); @@ -384,12 +394,24 @@ impl Actor { None } + fn get(&mut self, hash: &Hash) -> Option { + if *hash == Hash::EMPTY { + Some(self.state.empty_hash.clone()) + } else { + self.state.data.get(hash).cloned() + } + } + fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle { - self.state - .data - .entry(hash) - .or_insert_with(|| BaoFileHandle::new_partial(hash)) - .clone() + if hash == Hash::EMPTY { + self.state.empty_hash.clone() + } else { + self.state + .data + .entry(hash) + .or_insert_with(|| BaoFileHandle::new_partial(hash)) + .clone() + } } async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) { @@ -501,7 +523,12 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) Ok(()) } -async fn export_ranges(mut cmd: ExportRangesMsg, entry: BaoFileHandle) { +async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option) { + let Some(entry) = entry else { + let err = io::Error::new(io::ErrorKind::NotFound, "hash not found"); + cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok(); + return; + }; if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await { cmd.tx .send(ExportRangesItem::Error(cause.into())) @@ -624,12 +651,18 @@ async fn import_bao( tx.send(Ok(())).await.ok(); } -#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))] +#[instrument(skip_all, fields(hash = tracing::field::Empty))] async fn export_bao( - entry: BaoFileHandle, + entry: Option, ranges: ChunkRanges, mut sender: mpsc::Sender, ) { + let Some(entry) = entry else { + let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found")); + sender.send(err.into()).await.ok(); + return; + }; + tracing::Span::current().record("hash", tracing::field::display(entry.hash)); let data = entry.data_reader(); let outboard = entry.outboard_reader(); let tx = BaoTreeSender::new(&mut sender); @@ -828,6 +861,7 @@ impl Outboard for OutboardReader { struct State { data: HashMap, tags: BTreeMap, + empty_hash: BaoFileHandle, } #[derive(Debug, derive_more::From)] diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index a00cf82e..55ef3693 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -117,7 +117,7 @@ impl Actor { }); } Command::ExportBao(ExportBaoMsg { - inner: ExportBaoRequest { hash, ranges }, + inner: ExportBaoRequest { hash, ranges, .. }, tx, .. }) => {