diff --git a/src/api.rs b/src/api.rs index 94f34ade..f3c540cd 100644 --- a/src/api.rs +++ b/src/api.rs @@ -4,6 +4,7 @@ //! with a remote store via rpc calls. use std::{io, net::SocketAddr, ops::Deref, sync::Arc}; +use bao_tree::io::EncodeError; use iroh::Endpoint; use irpc::rpc::{listen, Handler}; use n0_snafu::SpanTrace; @@ -211,6 +212,15 @@ impl std::error::Error for Error { } } +impl From for Error { + fn from(value: EncodeError) -> Self { + match value { + EncodeError::Io(cause) => Self::Io(cause), + _ => Self::other(value), + } + } +} + pub type Result = std::result::Result; /// The main entry point for the store API. diff --git a/src/api/blobs.rs b/src/api/blobs.rs index 942daf0a..74c6e1f2 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -102,13 +102,21 @@ impl Blobs { }) } - pub async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> { + /// Delete a blob. + /// + /// This function is not public, because it does not work as expected when called manually, + /// because blobs are protected from deletion. This is only called from the gc task, which + /// clears the protections before. + /// + /// Users should rely only on garbage collection for blob deletion. + pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> { trace!("{options:?}"); self.client.rpc(options).await??; Ok(()) } - pub async fn delete( + /// See [`Self::delete_with_opts`]. + pub(crate) async fn delete( &self, hashes: impl IntoIterator>, ) -> RequestResult<()> { @@ -962,7 +970,6 @@ impl ExportBaoProgress { let mut data = Vec::new(); let mut stream = self.into_byte_stream(); while let Some(item) = stream.next().await { - println!("item: {item:?}"); data.extend_from_slice(&item?); } Ok(data) @@ -1088,7 +1095,7 @@ impl ExportBaoProgress { } EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)), EncodedItem::Done => None, - EncodedItem::Error(cause) => Some(Err(super::Error::other(cause))), + EncodedItem::Error(cause) => Some(Err(cause.into())), }) } diff --git a/src/api/remote.rs b/src/api/remote.rs index 75e66d0e..7a4055fb 100644 --- a/src/api/remote.rs +++ b/src/api/remote.rs @@ -10,7 +10,7 @@ use nested_enum_utils::common_fields; use ref_cast::RefCast; use snafu::{Backtrace, IntoError, Snafu}; -use super::blobs::Bitfield; +use super::blobs::{Bitfield, ExportBaoOptions}; use crate::{ api::{blobs::WriteProgress, ApiClient}, get::{fsm::DecodeError, BadRequestSnafu, GetError, GetResult, LocalFailureSnafu, Stats}, @@ -159,7 +159,7 @@ impl PushProgress { async fn just_result(stream: S) -> Option where - S: Stream, + S: Stream, R: TryFrom, { tokio::pin!(stream); @@ -417,12 +417,17 @@ impl Remote { let root = request.hash; let bitfield = self.store().observe(root).await?; let children = if !request.ranges.is_blob() { - let bao = self.store().export_bao(root, bitfield.ranges.clone()); + let opts = ExportBaoOptions { + hash: root, + ranges: bitfield.ranges.clone(), + }; + let bao = self.store().export_bao_with_opts(opts, 32); let mut by_index = BTreeMap::new(); let mut stream = bao.hashes_with_index(); while let Some(item) = stream.next().await { - let (index, hash) = item?; - by_index.insert(index, hash); + if let Ok((index, hash)) = item { + by_index.insert(index, hash); + } } let mut bitfields = BTreeMap::new(); let mut hash_seq = BTreeMap::new(); diff --git a/src/store/fs.rs b/src/store/fs.rs index ff8bc900..024d9786 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -305,6 +305,34 @@ impl HashContext { Ok(()) } + pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result { + if create { + self.get_or_create(hash).await + } else { + self.get(hash).await + } + } + + pub async fn get(&self, hash: Hash) -> api::Result { + if hash == Hash::EMPTY { + return Ok(self.ctx.empty.clone()); + } + let res = self + .slot + .get_or_create(|| async { + let res = self.db().get(hash).await.map_err(io::Error::other)?; + let res = match res { + Some(state) => open_bao_file(&hash, state, &self.ctx).await, + None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")), + }; + Ok((res?, ())) + }) + .await + .map_err(api::Error::from); + let (res, _) = res?; + Ok(res) + } + pub async fn get_or_create(&self, hash: Hash) -> api::Result { if hash == Hash::EMPTY { return Ok(self.ctx.empty.clone()); @@ -939,7 +967,7 @@ async fn observe(cmd: ObserveMsg, ctx: HashContext) { #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) { - match ctx.get_or_create(cmd.hash).await { + match ctx.get(cmd.hash).await { Ok(handle) => { if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await { cmd.tx @@ -1000,7 +1028,7 @@ async fn export_ranges_impl( #[instrument(skip_all, fields(hash = %cmd.hash_short()))] async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) { - match ctx.get_or_create(cmd.hash).await { + match ctx.get_maybe_create(cmd.hash, false).await { Ok(handle) => { if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await { cmd.tx @@ -1010,9 +1038,9 @@ async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) { } } Err(cause) => { - let cause = anyhow::anyhow!("failed to open file: {cause}"); + let crate::api::Error::Io(cause) = cause; cmd.tx - .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into()) + .send(bao_tree::io::EncodeError::Io(cause).into()) .await .ok(); } @@ -1024,7 +1052,7 @@ async fn export_bao_impl( tx: &mut mpsc::Sender, handle: BaoFileHandle, ) -> anyhow::Result<()> { - let ExportBaoRequest { ranges, hash } = cmd; + let ExportBaoRequest { ranges, hash, .. } = cmd; debug_assert!(handle.hash() == hash, "hash mismatch"); let outboard = handle.outboard()?; let size = outboard.tree.size(); diff --git a/src/store/fs/gc.rs b/src/store/fs/gc.rs index 4aa16c14..a394dc19 100644 --- a/src/store/fs/gc.rs +++ b/src/store/fs/gc.rs @@ -189,14 +189,18 @@ pub async fn run_gc(store: Store, config: GcConfig) { #[cfg(test)] mod tests { - use std::path::Path; + use std::{ + io::{self}, + path::Path, + }; - use bao_tree::ChunkNum; + use bao_tree::{io::EncodeError, ChunkNum}; + use range_collections::RangeSet2; use testresult::TestResult; use super::*; use crate::{ - api::{blobs::AddBytesOptions, Store}, + api::{blobs::AddBytesOptions, ExportBaoError, RequestError, Store}, hashseq::HashSeq, store::fs::{options::PathOptions, tests::create_n0_bao}, BlobFormat, @@ -326,4 +330,83 @@ mod tests { gc_smoke(&store).await?; Ok(()) } + + #[tokio::test] + async fn gc_check_deletion_fs() -> TestResult { + tracing_subscriber::fmt::try_init().ok(); + let testdir = tempfile::tempdir()?; + let db_path = testdir.path().join("db"); + let store = crate::store::fs::FsStore::load(&db_path).await?; + gc_check_deletion(&store).await + } + + #[tokio::test] + async fn gc_check_deletion_mem() -> TestResult { + tracing_subscriber::fmt::try_init().ok(); + let store = crate::store::mem::MemStore::default(); + gc_check_deletion(&store).await + } + + async fn gc_check_deletion(store: &Store) -> TestResult { + let temp_tag = store.add_bytes(b"foo".to_vec()).temp_tag().await?; + let hash = *temp_tag.hash(); + assert_eq!(store.get_bytes(hash).await?.as_ref(), b"foo"); + drop(temp_tag); + let mut live = HashSet::new(); + gc_run_once(store, &mut live).await?; + + // check that `get_bytes` returns an error. + let res = store.get_bytes(hash).await; + assert!(res.is_err()); + assert!(matches!( + res, + Err(ExportBaoError::ExportBaoInner { + source: EncodeError::Io(cause), + .. + }) if cause.kind() == io::ErrorKind::NotFound + )); + + // check that `export_ranges` returns an error. + let res = store + .export_ranges(hash, RangeSet2::all()) + .concatenate() + .await; + assert!(res.is_err()); + assert!(matches!( + res, + Err(RequestError::Inner{ + source: crate::api::Error::Io(cause), + .. + }) if cause.kind() == io::ErrorKind::NotFound + )); + + // check that `export_bao` returns an error. + let res = store + .export_bao(hash, ChunkRanges::all()) + .bao_to_vec() + .await; + assert!(res.is_err()); + println!("export_bao res {res:?}"); + assert!(matches!( + res, + Err(RequestError::Inner{ + source: crate::api::Error::Io(cause), + .. + }) if cause.kind() == io::ErrorKind::NotFound + )); + + // check that `export` returns an error. + let target = tempfile::NamedTempFile::new()?; + let path = target.path(); + let res = store.export(hash, path).await; + assert!(res.is_err()); + assert!(matches!( + res, + Err(RequestError::Inner{ + source: crate::api::Error::Io(cause), + .. + }) if cause.kind() == io::ErrorKind::NotFound + )); + Ok(()) + } } diff --git a/src/store/fs/meta.rs b/src/store/fs/meta.rs index a0831410..617db98c 100644 --- a/src/store/fs/meta.rs +++ b/src/store/fs/meta.rs @@ -463,6 +463,7 @@ impl Actor { } = cmd; for hash in hashes { if !force && protected.contains(&hash) { + trace!("delete {hash}: skip (protected)"); continue; } if let Some(entry) = tables.blobs.remove(hash).context(StorageSnafu)? { @@ -471,6 +472,7 @@ impl Actor { data_location, outboard_location, } => { + trace!("delete {hash}: currently complete. will be deleted."); match data_location { DataLocation::Inline(_) => { tables.inline_data.remove(hash).context(StorageSnafu)?; @@ -493,6 +495,7 @@ impl Actor { } } EntryState::Partial { .. } => { + trace!("delete {hash}: currently partial. will be deleted."); tables.ftx.delete( hash, [ diff --git a/src/store/mem.rs b/src/store/mem.rs index 94033eee..083e95f2 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -23,7 +23,7 @@ use bao_tree::{ mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt}, outboard::PreOrderMemOutboard, sync::{Outboard, ReadAt, WriteAt}, - BaoContentItem, Leaf, + BaoContentItem, EncodeError, Leaf, }, BaoTree, ChunkNum, ChunkRanges, TreeNode, }; @@ -117,6 +117,7 @@ impl MemStore { state: State { data: HashMap::new(), tags: BTreeMap::new(), + empty_hash: BaoFileHandle::new_partial(Hash::EMPTY), }, options: Arc::new(Options::default()), temp_tags: Default::default(), @@ -193,11 +194,11 @@ impl Actor { tx, .. }) => { - let entry = self.get_or_create_entry(hash); - self.spawn(export_bao(entry, ranges, tx)); + let entry = self.get(&hash); + self.spawn(export_bao(entry, ranges, tx)) } Command::ExportPath(cmd) => { - let entry = self.state.data.get(&cmd.hash).cloned(); + let entry = self.get(&cmd.hash); self.spawn(export_path(entry, cmd)); } Command::DeleteTags(cmd) => { @@ -329,7 +330,7 @@ impl Actor { tx, .. } = cmd; - let res = match self.state.data.get(&hash) { + let res = match self.get(&hash) { None => api::blobs::BlobStatus::NotFound, Some(x) => { let bitfield = x.0.state.borrow().bitfield(); @@ -371,8 +372,8 @@ impl Actor { cmd.tx.send(Ok(())).await.ok(); } Command::ExportRanges(cmd) => { - let entry = self.get_or_create_entry(cmd.hash); - self.spawn(export_ranges(cmd, entry.clone())); + let entry = self.get(&cmd.hash); + self.spawn(export_ranges(cmd, entry)); } Command::SyncDb(SyncDbMsg { tx, .. }) => { tx.send(Ok(())).await.ok(); @@ -384,12 +385,24 @@ impl Actor { None } + fn get(&mut self, hash: &Hash) -> Option { + if *hash == Hash::EMPTY { + Some(self.state.empty_hash.clone()) + } else { + self.state.data.get(hash).cloned() + } + } + fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle { - self.state - .data - .entry(hash) - .or_insert_with(|| BaoFileHandle::new_partial(hash)) - .clone() + if hash == Hash::EMPTY { + self.state.empty_hash.clone() + } else { + self.state + .data + .entry(hash) + .or_insert_with(|| BaoFileHandle::new_partial(hash)) + .clone() + } } async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) { @@ -501,7 +514,12 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) Ok(()) } -async fn export_ranges(mut cmd: ExportRangesMsg, entry: BaoFileHandle) { +async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option) { + let Some(entry) = entry else { + let err = io::Error::new(io::ErrorKind::NotFound, "hash not found"); + cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok(); + return; + }; if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await { cmd.tx .send(ExportRangesItem::Error(cause.into())) @@ -624,12 +642,18 @@ async fn import_bao( tx.send(Ok(())).await.ok(); } -#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))] +#[instrument(skip_all, fields(hash = tracing::field::Empty))] async fn export_bao( - entry: BaoFileHandle, + entry: Option, ranges: ChunkRanges, mut sender: mpsc::Sender, ) { + let Some(entry) = entry else { + let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found")); + sender.send(err.into()).await.ok(); + return; + }; + tracing::Span::current().record("hash", tracing::field::display(entry.hash)); let data = entry.data_reader(); let outboard = entry.outboard_reader(); let tx = BaoTreeSender::new(&mut sender); @@ -828,6 +852,7 @@ impl Outboard for OutboardReader { struct State { data: HashMap, tags: BTreeMap, + empty_hash: BaoFileHandle, } #[derive(Debug, derive_more::From)] diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index a00cf82e..55ef3693 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -117,7 +117,7 @@ impl Actor { }); } Command::ExportBao(ExportBaoMsg { - inner: ExportBaoRequest { hash, ranges }, + inner: ExportBaoRequest { hash, ranges, .. }, tx, .. }) => {