Skip to content

Commit 77fbb05

Browse files
committed
First step for removing the slot mutex
1 parent 27df6da commit 77fbb05

File tree

1 file changed

+139
-11
lines changed

1 file changed

+139
-11
lines changed

src/store/fs/bao_file.rs

Lines changed: 139 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,15 @@ fn read_size(size_file: &File) -> io::Result<u64> {
286286
}
287287

288288
/// The storage for a bao file. This can be either in memory or on disk.
289-
#[derive(derive_more::From)]
289+
#[derive(derive_more::From, Default)]
290290
pub(crate) enum BaoFileStorage {
291+
/// Initial state, we don't know anything yet.
292+
#[default]
293+
Initial,
294+
/// Currently loading the entry from the metadata.
295+
Loading,
296+
/// There is no info about this hash in the metadata db.
297+
NonExisting,
291298
/// The entry is incomplete and in memory.
292299
///
293300
/// Since it is incomplete, it must be writeable.
@@ -322,16 +329,13 @@ impl fmt::Debug for BaoFileStorage {
322329
BaoFileStorage::Partial(x) => x.fmt(f),
323330
BaoFileStorage::Complete(x) => x.fmt(f),
324331
BaoFileStorage::Poisoned => f.debug_struct("Poisoned").finish(),
332+
BaoFileStorage::Initial => f.debug_struct("Initial").finish(),
333+
BaoFileStorage::Loading => f.debug_struct("Loading").finish(),
334+
BaoFileStorage::NonExisting => f.debug_struct("NonExisting").finish(),
325335
}
326336
}
327337
}
328338

329-
impl Default for BaoFileStorage {
330-
fn default() -> Self {
331-
BaoFileStorage::Complete(Default::default())
332-
}
333-
}
334-
335339
impl PartialMemStorage {
336340
/// Converts this storage into a complete storage, using the given hash for
337341
/// path names and the given options for decisions about inlining.
@@ -387,9 +391,16 @@ impl PartialMemStorage {
387391
impl BaoFileStorage {
388392
pub fn bitfield(&self) -> Bitfield {
389393
match self {
390-
BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()),
394+
BaoFileStorage::Initial => {
395+
panic!("initial storage should not be used")
396+
}
397+
BaoFileStorage::Loading => {
398+
panic!("loading storage should not be used")
399+
}
400+
BaoFileStorage::NonExisting => Bitfield::empty(),
391401
BaoFileStorage::PartialMem(x) => x.bitfield.clone(),
392402
BaoFileStorage::Partial(x) => x.bitfield.clone(),
403+
BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()),
393404
BaoFileStorage::Poisoned => {
394405
panic!("poisoned storage should not be used")
395406
}
@@ -465,7 +476,7 @@ impl BaoFileStorage {
465476
// unless there is a bug, this would just write the exact same data
466477
(self, None)
467478
}
468-
BaoFileStorage::Poisoned => {
479+
_ => {
469480
// we are poisoned, so just ignore the write
470481
(self, None)
471482
}
@@ -483,13 +494,14 @@ impl BaoFileStorage {
483494
match self {
484495
Self::Complete(_) => Ok(()),
485496
Self::PartialMem(_) => Ok(()),
497+
Self::NonExisting => Ok(()),
486498
Self::Partial(file) => {
487499
file.data.sync_all()?;
488500
file.outboard.sync_all()?;
489501
file.sizes.sync_all()?;
490502
Ok(())
491503
}
492-
Self::Poisoned => {
504+
Self::Poisoned | Self::Initial | Self::Loading => {
493505
// we are poisoned, so just ignore the sync
494506
Ok(())
495507
}
@@ -506,6 +518,45 @@ impl BaoFileStorage {
506518
pub(crate) struct BaoFileHandle(Arc<watch::Sender<BaoFileStorage>>);
507519

508520
impl BaoFileHandle {
521+
pub(super) async fn load(&self, ctx: &HashContext) {
522+
enum Action {
523+
Load,
524+
Wait,
525+
None,
526+
}
527+
let mut action = Action::None;
528+
self.send_if_modified(|guard| match guard.deref() {
529+
BaoFileStorage::Initial => {
530+
*guard = BaoFileStorage::Loading;
531+
action = Action::Load;
532+
true
533+
}
534+
BaoFileStorage::Loading => {
535+
action = Action::Wait;
536+
false
537+
}
538+
_ => false,
539+
});
540+
match action {
541+
Action::Load => {
542+
let state = match ctx.global.db.get(ctx.id).await {
543+
Ok(state) => match BaoFileStorage::open(state, ctx).await {
544+
Ok(handle) => handle,
545+
Err(_) => BaoFileStorage::Poisoned,
546+
},
547+
Err(_) => BaoFileStorage::Poisoned,
548+
};
549+
self.send_replace(state);
550+
}
551+
Action::Wait => {
552+
while let BaoFileStorage::Loading = self.borrow().deref() {
553+
self.0.subscribe().changed().await.ok();
554+
}
555+
}
556+
Action::None => {}
557+
}
558+
}
559+
509560
pub(super) fn persist(&mut self, ctx: &HashContext) {
510561
self.send_if_modified(|guard| {
511562
let hash = &ctx.id;
@@ -542,6 +593,9 @@ impl ReadBytesAt for DataReader {
542593
BaoFileStorage::Partial(x) => x.data.read_bytes_at(offset, size),
543594
BaoFileStorage::Complete(x) => x.data.read_bytes_at(offset, size),
544595
BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")),
596+
BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")),
597+
BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")),
598+
BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()),
545599
}
546600
}
547601
}
@@ -558,10 +612,78 @@ impl ReadAt for OutboardReader {
558612
BaoFileStorage::PartialMem(x) => x.outboard.read_at(offset, buf),
559613
BaoFileStorage::Partial(x) => x.outboard.read_at(offset, buf),
560614
BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")),
615+
BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")),
616+
BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")),
617+
BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()),
561618
}
562619
}
563620
}
564621

622+
impl BaoFileStorage {
623+
pub async fn open(state: Option<EntryState<Bytes>>, ctx: &HashContext) -> io::Result<Self> {
624+
let hash = &ctx.id;
625+
let options = &ctx.global.options;
626+
Ok(match state {
627+
Some(EntryState::Complete {
628+
data_location,
629+
outboard_location,
630+
}) => {
631+
let data = match data_location {
632+
DataLocation::Inline(data) => MemOrFile::Mem(data),
633+
DataLocation::Owned(size) => {
634+
let path = options.path.data_path(hash);
635+
let file = std::fs::File::open(&path)?;
636+
MemOrFile::File(FixedSize::new(file, size))
637+
}
638+
DataLocation::External(paths, size) => {
639+
let Some(path) = paths.into_iter().next() else {
640+
return Err(io::Error::other("no external data path"));
641+
};
642+
let file = std::fs::File::open(&path)?;
643+
MemOrFile::File(FixedSize::new(file, size))
644+
}
645+
};
646+
let outboard = match outboard_location {
647+
OutboardLocation::NotNeeded => MemOrFile::empty(),
648+
OutboardLocation::Inline(data) => MemOrFile::Mem(data),
649+
OutboardLocation::Owned => {
650+
let path = options.path.outboard_path(hash);
651+
let file = std::fs::File::open(&path)?;
652+
MemOrFile::File(file)
653+
}
654+
};
655+
Self::new_complete(data, outboard)
656+
}
657+
Some(EntryState::Partial { .. }) => Self::new_partial_file(ctx).await?,
658+
None => Self::NonExisting,
659+
})
660+
}
661+
662+
/// Create a new bao file handle with a partial file.
663+
pub(super) async fn new_partial_file(ctx: &HashContext) -> io::Result<Self> {
664+
let hash = &ctx.id;
665+
let options = ctx.global.options.clone();
666+
let storage = PartialFileStorage::load(hash, &options.path)?;
667+
Ok(if storage.bitfield.is_complete() {
668+
let size = storage.bitfield.size;
669+
let (storage, entry_state) = storage.into_complete(size, &options)?;
670+
debug!("File was reconstructed as complete");
671+
ctx.global.db.set(*hash, entry_state).await?;
672+
storage.into()
673+
} else {
674+
storage.into()
675+
})
676+
}
677+
678+
/// Create a new complete bao file handle.
679+
pub fn new_complete(
680+
data: MemOrFile<Bytes, FixedSize<File>>,
681+
outboard: MemOrFile<Bytes, File>,
682+
) -> Self {
683+
CompleteStorage { data, outboard }.into()
684+
}
685+
}
686+
565687
impl BaoFileHandle {
566688
#[allow(dead_code)]
567689
pub fn id(&self) -> usize {
@@ -613,7 +735,7 @@ impl BaoFileHandle {
613735
BaoFileStorage::Complete(_) => None,
614736
BaoFileStorage::PartialMem(entry) => Some(&mut entry.bitfield),
615737
BaoFileStorage::Partial(entry) => Some(&mut entry.bitfield),
616-
BaoFileStorage::Poisoned => None,
738+
_ => None,
617739
};
618740
if let Some(bitfield) = res {
619741
bitfield.update(&Bitfield::complete(data.size()));
@@ -658,6 +780,9 @@ impl BaoFileHandle {
658780
BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()),
659781
BaoFileStorage::Partial(file) => file.current_size(),
660782
BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")),
783+
BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")),
784+
BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")),
785+
BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()),
661786
}
662787
}
663788

@@ -668,6 +793,9 @@ impl BaoFileHandle {
668793
BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()),
669794
BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()),
670795
BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")),
796+
BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")),
797+
BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")),
798+
BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()),
671799
}
672800
}
673801

0 commit comments

Comments
 (0)