Skip to content

Commit bc6bc71

Browse files
committed
Group high level fns.
1 parent f559598 commit bc6bc71

File tree

1 file changed

+89
-73
lines changed

1 file changed

+89
-73
lines changed

src/store/fs.rs

Lines changed: 89 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ use std::{
7070
num::NonZeroU64,
7171
ops::Deref,
7272
path::{Path, PathBuf},
73-
sync::Arc,
73+
sync::{
74+
atomic::{AtomicU64, Ordering},
75+
Arc,
76+
},
7477
};
7578

7679
use bao_tree::{
@@ -711,31 +714,31 @@ trait HashSpecificCommand: HashSpecific + Send + 'static {
711714

712715
impl HashSpecificCommand for ObserveMsg {
713716
async fn handle(self, ctx: HashContext) {
714-
observe(&ctx, self).await
717+
ctx.observe(self).await
715718
}
716719
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
717720
}
718721
impl HashSpecificCommand for ExportPathMsg {
719722
async fn handle(self, ctx: HashContext) {
720-
export_path(&ctx, self).await
723+
ctx.export_path(self).await
721724
}
722725
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
723726
}
724727
impl HashSpecificCommand for ExportBaoMsg {
725728
async fn handle(self, ctx: HashContext) {
726-
export_bao(&ctx, self).await
729+
ctx.export_bao(self).await
727730
}
728731
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
729732
}
730733
impl HashSpecificCommand for ExportRangesMsg {
731734
async fn handle(self, ctx: HashContext) {
732-
export_ranges(&ctx, self).await
735+
ctx.export_ranges(self).await
733736
}
734737
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
735738
}
736739
impl HashSpecificCommand for ImportBaoMsg {
737740
async fn handle(self, ctx: HashContext) {
738-
import_bao(&ctx, self).await
741+
ctx.import_bao(self).await
739742
}
740743
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
741744
}
@@ -747,7 +750,7 @@ impl HashSpecific for (TempTag, ImportEntryMsg) {
747750
impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
748751
async fn handle(self, ctx: HashContext) {
749752
let (tt, cmd) = self;
750-
finish_import(&ctx, cmd, tt).await
753+
ctx.finish_import(cmd, tt).await
751754
}
752755
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
753756
}
@@ -806,80 +809,87 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>)
806809
Ok(())
807810
}
808811

809-
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
810-
async fn import_bao(ctx: &HashContext, cmd: ImportBaoMsg) {
811-
trace!("{cmd:?}");
812-
let ImportBaoMsg {
813-
inner: ImportBaoRequest { size, .. },
814-
rx,
815-
tx,
816-
..
817-
} = cmd;
818-
ctx.load().await;
819-
let res = import_bao_impl(ctx, size, rx).await;
820-
trace!("{res:?}");
821-
tx.send(res).await.ok();
822-
}
823-
824-
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
825-
async fn observe(ctx: &HashContext, cmd: ObserveMsg) {
826-
trace!("{cmd:?}");
827-
ctx.load().await;
828-
BaoFileStorageSubscriber::new(ctx.state.subscribe())
829-
.forward(cmd.tx)
830-
.await
831-
.ok();
832-
}
833-
834-
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
835-
async fn export_ranges(ctx: &HashContext, mut cmd: ExportRangesMsg) {
836-
trace!("{cmd:?}");
837-
ctx.load().await;
838-
if let Err(cause) = export_ranges_impl(ctx, cmd.inner, &mut cmd.tx).await {
839-
cmd.tx
840-
.send(ExportRangesItem::Error(cause.into()))
812+
/// The high level entry point per entry.
813+
impl HashContext {
814+
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
815+
async fn import_bao(&self, cmd: ImportBaoMsg) {
816+
trace!("{cmd:?}");
817+
self.load().await;
818+
let ImportBaoMsg {
819+
inner: ImportBaoRequest { size, .. },
820+
rx,
821+
tx,
822+
..
823+
} = cmd;
824+
let res = import_bao_impl(self, size, rx).await;
825+
trace!("{res:?}");
826+
tx.send(res).await.ok();
827+
}
828+
829+
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
830+
async fn observe(&self, cmd: ObserveMsg) {
831+
trace!("{cmd:?}");
832+
self.load().await;
833+
BaoFileStorageSubscriber::new(self.state.subscribe())
834+
.forward(cmd.tx)
841835
.await
842836
.ok();
843837
}
844-
}
845838

846-
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
847-
async fn export_bao(ctx: &HashContext, mut cmd: ExportBaoMsg) {
848-
ctx.load().await;
849-
if let Err(cause) = export_bao_impl(ctx, cmd.inner, &mut cmd.tx).await {
850-
// if the entry is in state NonExisting, this will be an io error with
851-
// kind NotFound. So we must not wrap this somehow but pass it on directly.
852-
cmd.tx
853-
.send(bao_tree::io::EncodeError::Io(cause).into())
854-
.await
855-
.ok();
839+
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
840+
async fn export_ranges(&self, mut cmd: ExportRangesMsg) {
841+
trace!("{cmd:?}");
842+
self.load().await;
843+
if let Err(cause) = export_ranges_impl(self, cmd.inner, &mut cmd.tx).await {
844+
cmd.tx
845+
.send(ExportRangesItem::Error(cause.into()))
846+
.await
847+
.ok();
848+
}
856849
}
857-
}
858850

859-
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
860-
async fn export_path(ctx: &HashContext, cmd: ExportPathMsg) {
861-
let ExportPathMsg { inner, mut tx, .. } = cmd;
862-
if let Err(cause) = export_path_impl(ctx, inner, &mut tx).await {
863-
tx.send(cause.into()).await.ok();
851+
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
852+
async fn export_bao(&self, mut cmd: ExportBaoMsg) {
853+
trace!("{cmd:?}");
854+
self.load().await;
855+
if let Err(cause) = export_bao_impl(self, cmd.inner, &mut cmd.tx).await {
856+
// if the entry is in state NonExisting, this will be an io error with
857+
// kind NotFound. So we must not wrap this somehow but pass it on directly.
858+
cmd.tx
859+
.send(bao_tree::io::EncodeError::Io(cause).into())
860+
.await
861+
.ok();
862+
}
864863
}
865-
}
866864

867-
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
868-
async fn finish_import(ctx: &HashContext, cmd: ImportEntryMsg, mut tt: TempTag) {
869-
trace!("{cmd:?}");
870-
let res = match finish_import_impl(ctx, cmd.inner).await {
871-
Ok(()) => {
872-
// for a remote call, we can't have the on_drop callback, so we have to leak the temp tag
873-
// it will be cleaned up when either the process exits or scope ends
874-
if cmd.tx.is_rpc() {
875-
trace!("leaking temp tag {}", tt.hash_and_format());
876-
tt.leak();
877-
}
878-
AddProgressItem::Done(tt)
865+
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
866+
async fn export_path(&self, cmd: ExportPathMsg) {
867+
trace!("{cmd:?}");
868+
self.load().await;
869+
let ExportPathMsg { inner, mut tx, .. } = cmd;
870+
if let Err(cause) = export_path_impl(self, inner, &mut tx).await {
871+
tx.send(cause.into()).await.ok();
879872
}
880-
Err(cause) => AddProgressItem::Error(cause),
881-
};
882-
cmd.tx.send(res).await.ok();
873+
}
874+
875+
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
876+
async fn finish_import(&self, cmd: ImportEntryMsg, mut tt: TempTag) {
877+
trace!("{cmd:?}");
878+
self.load().await;
879+
let res = match finish_import_impl(self, cmd.inner).await {
880+
Ok(()) => {
881+
// for a remote call, we can't have the on_drop callback, so we have to leak the temp tag
882+
// it will be cleaned up when either the process exits or scope ends
883+
if cmd.tx.is_rpc() {
884+
trace!("leaking temp tag {}", tt.hash_and_format());
885+
tt.leak();
886+
}
887+
AddProgressItem::Done(tt)
888+
}
889+
Err(cause) => AddProgressItem::Error(cause),
890+
};
891+
cmd.tx.send(res).await.ok();
892+
}
883893
}
884894

885895
async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> {
@@ -1213,8 +1223,14 @@ impl FsStore {
12131223

12141224
/// Load or create a new store with custom options, returning an additional sender for file store specific commands.
12151225
pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1226+
static THREAD_NR: AtomicU64 = AtomicU64::new(0);
12161227
let rt = tokio::runtime::Builder::new_multi_thread()
1217-
.thread_name("iroh-blob-store")
1228+
.thread_name_fn(|| {
1229+
format!(
1230+
"iroh-blob-store-{}",
1231+
THREAD_NR.fetch_add(1, Ordering::SeqCst)
1232+
)
1233+
})
12181234
.enable_time()
12191235
.build()?;
12201236
let handle = rt.handle().clone();

0 commit comments

Comments
 (0)