Skip to content

Commit ea29863

Browse files
committed
fix: getting deleted blobs returns errors
1 parent e7338e2 commit ea29863

File tree

5 files changed

+67
-16
lines changed

5 files changed

+67
-16
lines changed

src/api.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! with a remote store via rpc calls.
55
use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
66

7+
use bao_tree::io::EncodeError;
78
use iroh::Endpoint;
89
use irpc::rpc::{listen, Handler};
910
use n0_snafu::SpanTrace;
@@ -211,6 +212,15 @@ impl std::error::Error for Error {
211212
}
212213
}
213214

215+
impl From<EncodeError> for Error {
216+
fn from(value: EncodeError) -> Self {
217+
match value {
218+
EncodeError::Io(cause) => Self::Io(cause),
219+
_ => Self::other(value),
220+
}
221+
}
222+
}
223+
214224
pub type Result<T> = std::result::Result<T, Error>;
215225

216226
/// The main entry point for the store API.

src/api/blobs.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,21 @@ impl Blobs {
102102
})
103103
}
104104

105-
pub async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
105+
/// Delete a blob.
106+
///
107+
/// This function is not public, because it does not work as expected when called manually,
108+
/// because blobs are protected from deletion. This is only called from the gc task, which
109+
/// clears the protections before.
110+
///
111+
/// Users should rely only on garbage collection for blob deletion.
112+
pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
106113
trace!("{options:?}");
107114
self.client.rpc(options).await??;
108115
Ok(())
109116
}
110117

111-
pub async fn delete(
118+
/// See [`Self::delete_with_opts`].
119+
pub(crate) async fn delete(
112120
&self,
113121
hashes: impl IntoIterator<Item = impl Into<Hash>>,
114122
) -> RequestResult<()> {
@@ -962,7 +970,6 @@ impl ExportBaoProgress {
962970
let mut data = Vec::new();
963971
let mut stream = self.into_byte_stream();
964972
while let Some(item) = stream.next().await {
965-
println!("item: {item:?}");
966973
data.extend_from_slice(&item?);
967974
}
968975
Ok(data)
@@ -1088,7 +1095,7 @@ impl ExportBaoProgress {
10881095
}
10891096
EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
10901097
EncodedItem::Done => None,
1091-
EncodedItem::Error(cause) => Some(Err(super::Error::other(cause))),
1098+
EncodedItem::Error(cause) => Some(Err(cause.into())),
10921099
})
10931100
}
10941101

src/store/fs.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,26 @@ impl HashContext {
305305
Ok(())
306306
}
307307

308+
pub async fn get(&self, hash: Hash) -> api::Result<BaoFileHandle> {
309+
if hash == Hash::EMPTY {
310+
return Ok(self.ctx.empty.clone());
311+
}
312+
let res = self
313+
.slot
314+
.get_or_create(|| async {
315+
let res = self.db().get(hash).await.map_err(io::Error::other)?;
316+
let res = match res {
317+
Some(state) => open_bao_file(&hash, state, &self.ctx).await,
318+
None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")),
319+
};
320+
Ok((res?, ()))
321+
})
322+
.await
323+
.map_err(api::Error::from);
324+
let (res, _) = res?;
325+
Ok(res)
326+
}
327+
308328
pub async fn get_or_create(&self, hash: Hash) -> api::Result<BaoFileHandle> {
309329
if hash == Hash::EMPTY {
310330
return Ok(self.ctx.empty.clone());
@@ -939,7 +959,7 @@ async fn observe(cmd: ObserveMsg, ctx: HashContext) {
939959

940960
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
941961
async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) {
942-
match ctx.get_or_create(cmd.hash).await {
962+
match ctx.get(cmd.hash).await {
943963
Ok(handle) => {
944964
if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await {
945965
cmd.tx
@@ -1000,7 +1020,7 @@ async fn export_ranges_impl(
10001020

10011021
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
10021022
async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
1003-
match ctx.get_or_create(cmd.hash).await {
1023+
match ctx.get(cmd.hash).await {
10041024
Ok(handle) => {
10051025
if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await {
10061026
cmd.tx
@@ -1010,9 +1030,9 @@ async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
10101030
}
10111031
}
10121032
Err(cause) => {
1013-
let cause = anyhow::anyhow!("failed to open file: {cause}");
1033+
let crate::api::Error::Io(cause) = cause;
10141034
cmd.tx
1015-
.send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into())
1035+
.send(bao_tree::io::EncodeError::Io(cause).into())
10161036
.await
10171037
.ok();
10181038
}

src/store/fs/meta.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@ impl Actor {
463463
} = cmd;
464464
for hash in hashes {
465465
if !force && protected.contains(&hash) {
466+
trace!("delete {hash}: skip (protected)");
466467
continue;
467468
}
468469
if let Some(entry) = tables.blobs.remove(hash).context(StorageSnafu)? {
@@ -471,6 +472,7 @@ impl Actor {
471472
data_location,
472473
outboard_location,
473474
} => {
475+
trace!("delete {hash}: currently complete. will be deleted.");
474476
match data_location {
475477
DataLocation::Inline(_) => {
476478
tables.inline_data.remove(hash).context(StorageSnafu)?;
@@ -493,6 +495,7 @@ impl Actor {
493495
}
494496
}
495497
EntryState::Partial { .. } => {
498+
trace!("delete {hash}: currently partial. will be deleted.");
496499
tables.ftx.delete(
497500
hash,
498501
[

src/store/mem.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use bao_tree::{
2323
mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
2424
outboard::PreOrderMemOutboard,
2525
sync::{Outboard, ReadAt, WriteAt},
26-
BaoContentItem, Leaf,
26+
BaoContentItem, EncodeError, Leaf,
2727
},
2828
BaoTree, ChunkNum, ChunkRanges, TreeNode,
2929
};
@@ -193,8 +193,8 @@ impl Actor {
193193
tx,
194194
..
195195
}) => {
196-
let entry = self.get_or_create_entry(hash);
197-
self.spawn(export_bao(entry, ranges, tx));
196+
let entry = self.state.data.get(&hash).cloned();
197+
self.spawn(export_bao(entry, ranges, tx))
198198
}
199199
Command::ExportPath(cmd) => {
200200
let entry = self.state.data.get(&cmd.hash).cloned();
@@ -371,8 +371,8 @@ impl Actor {
371371
cmd.tx.send(Ok(())).await.ok();
372372
}
373373
Command::ExportRanges(cmd) => {
374-
let entry = self.get_or_create_entry(cmd.hash);
375-
self.spawn(export_ranges(cmd, entry.clone()));
374+
let entry = self.state.data.get(&cmd.hash).cloned();
375+
self.spawn(export_ranges(cmd, entry));
376376
}
377377
Command::SyncDb(SyncDbMsg { tx, .. }) => {
378378
tx.send(Ok(())).await.ok();
@@ -501,7 +501,12 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>)
501501
Ok(())
502502
}
503503

504-
async fn export_ranges(mut cmd: ExportRangesMsg, entry: BaoFileHandle) {
504+
async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
505+
let Some(entry) = entry else {
506+
let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
507+
cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
508+
return;
509+
};
505510
if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
506511
cmd.tx
507512
.send(ExportRangesItem::Error(cause.into()))
@@ -624,12 +629,18 @@ async fn import_bao(
624629
tx.send(Ok(())).await.ok();
625630
}
626631

627-
#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
632+
#[instrument(skip_all, fields(hash = tracing::field::Empty))]
628633
async fn export_bao(
629-
entry: BaoFileHandle,
634+
entry: Option<BaoFileHandle>,
630635
ranges: ChunkRanges,
631636
mut sender: mpsc::Sender<EncodedItem>,
632637
) {
638+
let Some(entry) = entry else {
639+
let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
640+
sender.send(err.into()).await.ok();
641+
return;
642+
};
643+
tracing::Span::current().record("hash", tracing::field::display(entry.hash));
633644
let data = entry.data_reader();
634645
let outboard = entry.outboard_reader();
635646
let tx = BaoTreeSender::new(&mut sender);

0 commit comments

Comments
 (0)