Skip to content

Commit 161a70b

Browse files
committed
Replace slots with EntityHandler
1 parent 8cae7cd commit 161a70b

File tree

1 file changed

+118
-73
lines changed

1 file changed

+118
-73
lines changed

src/store/fs.rs

Lines changed: 118 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ use bao_tree::{
8484
};
8585
use bytes::Bytes;
8686
use delete_set::{BaoFilePart, ProtectHandle};
87-
use entity_manager::{EntityManager, Options as EntityManagerOptions};
87+
use entity_manager::{EntityManager, Options as EntityManagerOptions, SpawnArg};
8888
use entry_state::{DataLocation, OutboardLocation};
8989
use gc::run_gc;
9090
use import::{ImportEntry, ImportSource};
@@ -201,11 +201,10 @@ impl TaskContext {
201201
}
202202
}
203203

204-
#[derive(Debug, Clone, Default)]
205-
struct EntityState;
206-
207-
impl entity_manager::Reset for EntityState {
208-
fn reset(&mut self) {}
204+
impl entity_manager::Reset for Slot {
205+
fn reset(&mut self) {
206+
self.0 = Arc::new(tokio::sync::Mutex::new(None));
207+
}
209208
}
210209

211210
#[derive(Debug)]
@@ -216,7 +215,7 @@ impl entity_manager::Params for EmParams {
216215

217216
type GlobalState = Arc<TaskContext>;
218217

219-
type EntityState = EntityState;
218+
type EntityState = Slot;
220219

221220
async fn on_shutdown(
222221
state: entity_manager::ActiveEntityState<Self>,
@@ -235,11 +234,7 @@ struct Actor {
235234
fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
236235
// Tasks for import and export operations.
237236
tasks: JoinSet<()>,
238-
// Running tasks
239-
running: HashSet<Id>,
240-
// handles
241-
handles: HashMap<Hash, Slot>,
242-
237+
// Entity handler
243238
handles2: EntityManager<EmParams>,
244239
// temp tags
245240
temp_tags: TempTags,
@@ -293,14 +288,6 @@ impl HashContext {
293288
self.db().set(hash, state).await
294289
}
295290

296-
pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result<BaoFileHandle> {
297-
if create {
298-
self.get_or_create(hash).await
299-
} else {
300-
self.get(hash).await
301-
}
302-
}
303-
304291
pub async fn get(&self, hash: Hash) -> api::Result<BaoFileHandle> {
305292
if hash == Hash::EMPTY {
306293
return Ok(self.ctx.empty.clone());
@@ -433,17 +420,12 @@ impl Actor {
433420

434421
fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
435422
let span = tracing::Span::current();
436-
let id = self.tasks.spawn(fut.instrument(span)).id();
437-
self.running.insert(id);
423+
self.tasks.spawn(fut.instrument(span));
438424
}
439425

440-
fn log_task_result(&mut self, res: Result<(Id, ()), JoinError>) {
426+
fn log_task_result(&mut self, res: Result<(), JoinError>) {
441427
match res {
442-
Ok((id, _)) => {
443-
// println!("task {id} finished");
444-
self.running.remove(&id);
445-
// println!("{:?}", self.running);
446-
}
428+
Ok(_) => {}
447429
Err(e) => {
448430
error!("task failed: {e}");
449431
}
@@ -459,26 +441,6 @@ impl Actor {
459441
tx.send(tt).await.ok();
460442
}
461443

462-
async fn clear_dead_handles(&mut self) {
463-
let mut to_remove = Vec::new();
464-
for (hash, slot) in &self.handles {
465-
if !slot.is_live().await {
466-
to_remove.push(*hash);
467-
}
468-
}
469-
for hash in to_remove {
470-
if let Some(slot) = self.handles.remove(&hash) {
471-
// do a quick check if the handle has become alive in the meantime, and reinsert it
472-
let guard = slot.0.lock().await;
473-
let is_live = guard.as_ref().map(|x| !x.is_dead()).unwrap_or_default();
474-
if is_live {
475-
drop(guard);
476-
self.handles.insert(hash, slot);
477-
}
478-
}
479-
}
480-
}
481-
482444
async fn handle_command(&mut self, cmd: Command) {
483445
let span = cmd.parent_span();
484446
let _entered = span.enter();
@@ -513,7 +475,6 @@ impl Actor {
513475
}
514476
Command::ClearProtected(cmd) => {
515477
trace!("{cmd:?}");
516-
self.clear_dead_handles().await;
517478
self.db().send(cmd.into()).await.ok();
518479
}
519480
Command::BlobStatus(cmd) => {
@@ -569,40 +530,112 @@ impl Actor {
569530
}
570531
Command::ExportPath(cmd) => {
571532
trace!("{cmd:?}");
572-
let ctx = self.hash_context(cmd.hash);
573-
self.spawn(export_path(cmd, ctx));
533+
let ctx = self.context.clone();
534+
self.handles2
535+
.spawn(cmd.hash, |state| async move {
536+
match state {
537+
SpawnArg::Active(state) => {
538+
let ctx = HashContext {
539+
slot: state.state,
540+
ctx,
541+
};
542+
export_path(cmd, ctx).await
543+
}
544+
_ => {}
545+
}
546+
})
547+
.await
548+
.ok();
549+
// let ctx = self.hash_context(cmd.hash);
550+
// self.spawn(export_path(cmd, ctx));
574551
}
575552
Command::ExportBao(cmd) => {
576553
trace!("{cmd:?}");
577-
let ctx = self.hash_context(cmd.hash);
578-
self.spawn(export_bao(cmd, ctx));
554+
let ctx = self.context.clone();
555+
self.handles2
556+
.spawn(cmd.hash, |state| async move {
557+
match state {
558+
SpawnArg::Active(state) => {
559+
let ctx = HashContext {
560+
slot: state.state,
561+
ctx,
562+
};
563+
export_bao(cmd, ctx).await
564+
}
565+
_ => {}
566+
}
567+
})
568+
.await
569+
.ok();
570+
// let ctx = self.hash_context(cmd.hash);
571+
// self.spawn(export_bao(cmd, ctx));
579572
}
580573
Command::ExportRanges(cmd) => {
581574
trace!("{cmd:?}");
582-
let ctx = self.hash_context(cmd.hash);
583-
self.spawn(export_ranges(cmd, ctx));
575+
let ctx = self.context.clone();
576+
self.handles2
577+
.spawn(cmd.hash, |state| async move {
578+
match state {
579+
SpawnArg::Active(state) => {
580+
let ctx = HashContext {
581+
slot: state.state,
582+
ctx,
583+
};
584+
export_ranges(cmd, ctx).await
585+
}
586+
_ => {}
587+
}
588+
})
589+
.await
590+
.ok();
591+
// let ctx = self.hash_context(cmd.hash);
592+
// self.spawn(export_ranges(cmd, ctx));
584593
}
585594
Command::ImportBao(cmd) => {
586595
trace!("{cmd:?}");
587-
let ctx = self.hash_context(cmd.hash);
588-
self.spawn(import_bao(cmd, ctx));
596+
let ctx = self.context.clone();
597+
self.handles2
598+
.spawn(cmd.hash, |state| async move {
599+
match state {
600+
SpawnArg::Active(state) => {
601+
let ctx = HashContext {
602+
slot: state.state,
603+
ctx,
604+
};
605+
import_bao(cmd, ctx).await
606+
}
607+
_ => {}
608+
}
609+
})
610+
.await
611+
.ok();
612+
// let ctx = self.hash_context(cmd.hash);
613+
// self.spawn(import_bao(cmd, ctx));
589614
}
590615
Command::Observe(cmd) => {
591616
trace!("{cmd:?}");
592-
let ctx = self.hash_context(cmd.hash);
593-
self.spawn(observe(cmd, ctx));
617+
let ctx = self.context.clone();
618+
self.handles2
619+
.spawn(cmd.hash, |state| async move {
620+
match state {
621+
SpawnArg::Active(state) => {
622+
let ctx = HashContext {
623+
slot: state.state,
624+
ctx,
625+
};
626+
observe(cmd, ctx).await
627+
}
628+
_ => {}
629+
}
630+
})
631+
.await
632+
.ok();
633+
// let ctx = self.hash_context(cmd.hash);
634+
// self.spawn(observe(cmd, ctx));
594635
}
595636
}
596637
}
597638

598-
/// Create a hash context for a given hash.
599-
fn hash_context(&mut self, hash: Hash) -> HashContext {
600-
HashContext {
601-
slot: self.handles.entry(hash).or_default().clone(),
602-
ctx: self.context.clone(),
603-
}
604-
}
605-
606639
async fn handle_fs_command(&mut self, cmd: InternalCommand) {
607640
let span = cmd.parent_span();
608641
let _entered = span.enter();
@@ -630,8 +663,22 @@ impl Actor {
630663
format: cmd.format,
631664
},
632665
);
633-
let ctx = self.hash_context(cmd.hash);
634-
self.spawn(finish_import(cmd, tt, ctx));
666+
let ctx = self.context.clone();
667+
self.handles2
668+
.spawn(cmd.hash, |state| async move {
669+
match state {
670+
SpawnArg::Active(state) => {
671+
let ctx = HashContext {
672+
slot: state.state,
673+
ctx,
674+
};
675+
finish_import(cmd, tt, ctx).await
676+
}
677+
_ => {}
678+
}
679+
})
680+
.await
681+
.ok();
635682
}
636683
}
637684
}
@@ -649,7 +696,7 @@ impl Actor {
649696
Some(cmd) = self.fs_cmd_rx.recv() => {
650697
self.handle_fs_command(cmd).await;
651698
}
652-
Some(res) = self.tasks.join_next_with_id(), if !self.tasks.is_empty() => {
699+
Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
653700
self.log_task_result(res);
654701
}
655702
}
@@ -700,8 +747,6 @@ impl Actor {
700747
cmd_rx,
701748
fs_cmd_rx: fs_commands_rx,
702749
tasks: JoinSet::new(),
703-
running: HashSet::new(),
704-
handles: Default::default(),
705750
handles2: EntityManager::new(slot_context, EntityManagerOptions::default()),
706751
temp_tags: Default::default(),
707752
_rt: rt,
@@ -1017,7 +1062,7 @@ async fn export_ranges_impl(
10171062

10181063
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
10191064
async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
1020-
match ctx.get_maybe_create(cmd.hash, false).await {
1065+
match ctx.get(cmd.hash).await {
10211066
Ok(handle) => {
10221067
if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await {
10231068
cmd.tx

0 commit comments

Comments
 (0)