Skip to content

Commit 6e237f8

Browse files
committed
instrument the per-hash tasks as well
1 parent a540002 commit 6e237f8

File tree

3 files changed

+52
-45
lines changed

3 files changed

+52
-45
lines changed

src/store/fs.rs

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -570,9 +570,12 @@ impl Actor {
570570
}
571571

572572
trait HashSpecificCommand: HashSpecific + Send + 'static {
573+
/// Handle the command on success by spawning a task into the per-hash context.
573574
fn handle(self, ctx: HashContext) -> impl Future<Output = ()> + Send + 'static;
574575

575-
fn on_error(self) -> impl Future<Output = ()> + Send + 'static;
576+
/// Opportunity to send an error if spawning fails due to the task being busy (inbox full)
577+
/// or dead (e.g. panic in one of the running tasks).
578+
fn on_error(self, arg: SpawnArg<EmParams>) -> impl Future<Output = ()> + Send + 'static;
576579

577580
async fn spawn(
578581
self,
@@ -581,25 +584,24 @@ trait HashSpecificCommand: HashSpecific + Send + 'static {
581584
) where
582585
Self: Sized,
583586
{
587+
let span = tracing::Span::current();
584588
let task = manager
585-
.spawn_boxed(
586-
self.hash(),
587-
Box::new(|x| {
588-
Box::pin(async move {
589-
match x {
590-
SpawnArg::Active(state) => {
591-
self.handle(state).await;
592-
}
593-
SpawnArg::Busy => {
594-
self.on_error().await;
595-
}
596-
SpawnArg::Dead => {
597-
self.on_error().await;
598-
}
589+
.spawn(self.hash(), |arg| {
590+
async move {
591+
match arg {
592+
SpawnArg::Active(state) => {
593+
self.handle(state).await;
599594
}
600-
})
601-
}),
602-
)
595+
SpawnArg::Busy => {
596+
self.on_error(arg).await;
597+
}
598+
SpawnArg::Dead => {
599+
self.on_error(arg).await;
600+
}
601+
}
602+
}
603+
.instrument(span)
604+
})
603605
.await;
604606
if let Some(task) = task {
605607
tasks.spawn(task);
@@ -611,31 +613,31 @@ impl HashSpecificCommand for ObserveMsg {
611613
async fn handle(self, ctx: HashContext) {
612614
observe(self, ctx).await
613615
}
614-
async fn on_error(self) {}
616+
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
615617
}
616618
impl HashSpecificCommand for ExportPathMsg {
617619
async fn handle(self, ctx: HashContext) {
618620
export_path(self, ctx).await
619621
}
620-
async fn on_error(self) {}
622+
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
621623
}
622624
impl HashSpecificCommand for ExportBaoMsg {
623625
async fn handle(self, ctx: HashContext) {
624626
export_bao(self, ctx).await
625627
}
626-
async fn on_error(self) {}
628+
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
627629
}
628630
impl HashSpecificCommand for ExportRangesMsg {
629631
async fn handle(self, ctx: HashContext) {
630632
export_ranges(self, ctx).await
631633
}
632-
async fn on_error(self) {}
634+
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
633635
}
634636
impl HashSpecificCommand for ImportBaoMsg {
635637
async fn handle(self, ctx: HashContext) {
636638
import_bao(self, ctx).await
637639
}
638-
async fn on_error(self) {}
640+
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
639641
}
640642
impl HashSpecific for (TempTag, ImportEntryMsg) {
641643
fn hash(&self) -> Hash {
@@ -647,7 +649,7 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
647649
let (tt, cmd) = self;
648650
finish_import(cmd, tt, ctx).await
649651
}
650-
async fn on_error(self) {}
652+
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
651653
}
652654

653655
struct RtWrapper(Option<tokio::runtime::Runtime>);

src/store/fs/bao_file.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,15 @@ fn read_size(size_file: &File) -> io::Result<u64> {
285285
}
286286

287287
/// The storage for a bao file. This can be either in memory or on disk.
288+
///
289+
/// The two initial states `Initial` and `Loading` are used to coordinate the
290+
/// loading of the entry from the metadata database. Once that is complete,
291+
/// you should never see these states again.
292+
///
293+
/// From the remaining states you can get into `Poisoned` if there is an
294+
/// IO error during an operation.
295+
///
296+
/// `Poisioned` is also used once the handle is persisted and no longer usable.
288297
#[derive(derive_more::From, Default)]
289298
pub(crate) enum BaoFileStorage {
290299
/// Initial state, we don't know anything yet.
@@ -311,13 +320,8 @@ pub(crate) enum BaoFileStorage {
311320
///
312321
/// Writing to this is a no-op, since it is already complete.
313322
Complete(CompleteStorage),
314-
/// We will get into that state if there is an io error in the middle of an operation
315-
///
316-
/// Also, when the handle is dropped we will poison the storage, so poisoned
317-
/// can be seen when the handle is revived during the drop.
318-
///
319-
/// BaoFileHandleWeak::upgrade() will return None if the storage is poisoned,
320-
/// treat it as dead.
323+
/// We will get into that state if there is an io error in the middle of an operation,
324+
/// or after the handle is persisted and no longer usable.
321325
Poisoned,
322326
}
323327

src/store/fs/util/entity_manager.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -373,24 +373,25 @@ mod main_actor {
373373
}
374374

375375
/// Friendly version of `spawn_boxed` that does the boxing
376-
pub async fn spawn<F, Fut>(&mut self, id: P::EntityId, f: F, tasks: &mut JoinSet<()>)
376+
#[must_use = "this function may return a future that must be spawned by the caller"]
377+
pub async fn spawn<F, Fut>(
378+
&mut self,
379+
id: P::EntityId,
380+
f: F,
381+
) -> Option<impl Future<Output = ()> + Send + 'static>
377382
where
378383
F: FnOnce(SpawnArg<P>) -> Fut + Send + 'static,
379384
Fut: Future<Output = ()> + Send + 'static,
380385
{
381-
let task = self
382-
.spawn_boxed(
383-
id,
384-
Box::new(|x| {
385-
Box::pin(async move {
386-
f(x).await;
387-
})
388-
}),
389-
)
390-
.await;
391-
if let Some(task) = task {
392-
tasks.spawn(task);
393-
}
386+
self.spawn_boxed(
387+
id,
388+
Box::new(|x| {
389+
Box::pin(async move {
390+
f(x).await;
391+
})
392+
}),
393+
)
394+
.await
394395
}
395396

396397
#[must_use = "this function may return a future that must be spawned by the caller"]

0 commit comments

Comments
 (0)