Skip to content

Commit 632131f

Browse files
committed
Introduce traits to structure the API
The longer term plan for this is to give people something to implement if they want their own store, so they don't have to do everything by themselves!
1 parent bc6bc71 commit 632131f

File tree

1 file changed

+135
-47
lines changed

1 file changed

+135
-47
lines changed

src/store/fs.rs

Lines changed: 135 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@
6464
//! safely shut down as well. Any store refs you are holding will be inoperable
6565
//! after this.
6666
use std::{
67-
fmt, fs,
67+
fmt::{self, Debug},
68+
fs,
6869
future::Future,
6970
io::Write,
7071
num::NonZeroU64,
@@ -224,7 +225,7 @@ impl entity_manager::Params for EmParams {
224225
state: entity_manager::ActiveEntityState<Self>,
225226
_cause: entity_manager::ShutdownCause,
226227
) {
227-
state.persist();
228+
state.persist().await;
228229
}
229230
}
230231

@@ -248,13 +249,13 @@ struct Actor {
248249

249250
type HashContext = ActiveEntityState<EmParams>;
250251

251-
impl HashContext {
252+
impl SyncEntityApi for HashContext {
252253
/// Load the state from the database.
253254
///
254255
/// If the state is Initial, this will start the load.
255256
/// If it is Loading, it will wait until loading is done.
256257
/// If it is any other state, it will be a noop.
257-
pub async fn load(&self) {
258+
async fn load(&self) {
258259
enum Action {
259260
Load,
260261
Wait,
@@ -304,32 +305,8 @@ impl HashContext {
304305
}
305306
}
306307

307-
pub(super) fn persist(&self) {
308-
self.state.send_if_modified(|guard| {
309-
let hash = &self.id;
310-
let BaoFileStorage::Partial(fs) = guard.take() else {
311-
return false;
312-
};
313-
let path = self.global.options.path.bitfield_path(hash);
314-
trace!("writing bitfield for hash {} to {}", hash, path.display());
315-
if let Err(cause) = fs.sync_all(&path) {
316-
error!(
317-
"failed to write bitfield for {} at {}: {:?}",
318-
hash,
319-
path.display(),
320-
cause
321-
);
322-
}
323-
false
324-
});
325-
}
326-
327308
/// Write a batch and notify the db
328-
pub(super) async fn write_batch(
329-
&self,
330-
batch: &[BaoContentItem],
331-
bitfield: &Bitfield,
332-
) -> io::Result<()> {
309+
async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()> {
333310
trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len());
334311
let mut res = Ok(None);
335312
self.state.send_if_modified(|state| {
@@ -351,44 +328,48 @@ impl HashContext {
351328
///
352329
/// Caution: this is a reader for the unvalidated data file. Reading this
353330
/// can produce data that does not match the hash.
354-
pub fn data_reader(&self) -> DataReader {
331+
#[allow(refining_impl_trait_internal)]
332+
fn data_reader(&self) -> DataReader {
355333
DataReader(self.state.clone())
356334
}
357335

358336
/// An AsyncSliceReader for the outboard file.
359337
///
360338
/// The outboard file is used to validate the data file. It is not guaranteed
361339
/// to be complete.
362-
pub fn outboard_reader(&self) -> OutboardReader {
340+
#[allow(refining_impl_trait_internal)]
341+
fn outboard_reader(&self) -> OutboardReader {
363342
OutboardReader(self.state.clone())
364343
}
365344

366345
/// The most precise known total size of the data file.
367-
pub fn current_size(&self) -> io::Result<u64> {
346+
fn current_size(&self) -> io::Result<u64> {
368347
match self.state.borrow().deref() {
369348
BaoFileStorage::Complete(mem) => Ok(mem.size()),
370349
BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()),
371350
BaoFileStorage::Partial(file) => file.current_size(),
372-
BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")),
373-
BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")),
374-
BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")),
375-
BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()),
351+
BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
352+
BaoFileStorage::Initial => Err(io::Error::other("initial")),
353+
BaoFileStorage::Loading => Err(io::Error::other("loading")),
354+
BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
376355
}
377356
}
378357

379358
/// The most precise known total size of the data file.
380-
pub fn bitfield(&self) -> io::Result<Bitfield> {
359+
fn bitfield(&self) -> io::Result<Bitfield> {
381360
match self.state.borrow().deref() {
382361
BaoFileStorage::Complete(mem) => Ok(mem.bitfield()),
383362
BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()),
384363
BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()),
385-
BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")),
386-
BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")),
387-
BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")),
388-
BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()),
364+
BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
365+
BaoFileStorage::Initial => Err(io::Error::other("initial")),
366+
BaoFileStorage::Loading => Err(io::Error::other("loading")),
367+
BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
389368
}
390369
}
370+
}
391371

372+
impl HashContext {
392373
/// The outboard for the file.
393374
pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader>> {
394375
let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
@@ -722,25 +703,62 @@ impl HashSpecificCommand for ExportPathMsg {
722703
async fn handle(self, ctx: HashContext) {
723704
ctx.export_path(self).await
724705
}
725-
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
706+
async fn on_error(self, arg: SpawnArg<EmParams>) {
707+
let err = match arg {
708+
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
709+
SpawnArg::Dead => io::Error::other("entity is dead"),
710+
_ => unreachable!(),
711+
};
712+
self.tx
713+
.send(ExportProgressItem::Error(api::Error::Io(err)))
714+
.await
715+
.ok();
716+
}
726717
}
727718
impl HashSpecificCommand for ExportBaoMsg {
728719
async fn handle(self, ctx: HashContext) {
729720
ctx.export_bao(self).await
730721
}
731-
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
722+
async fn on_error(self, arg: SpawnArg<EmParams>) {
723+
let err = match arg {
724+
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
725+
SpawnArg::Dead => io::Error::other("entity is dead"),
726+
_ => unreachable!(),
727+
};
728+
self.tx
729+
.send(EncodedItem::Error(bao_tree::io::EncodeError::Io(err)))
730+
.await
731+
.ok();
732+
}
732733
}
733734
impl HashSpecificCommand for ExportRangesMsg {
734735
async fn handle(self, ctx: HashContext) {
735736
ctx.export_ranges(self).await
736737
}
737-
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
738+
async fn on_error(self, arg: SpawnArg<EmParams>) {
739+
let err = match arg {
740+
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
741+
SpawnArg::Dead => io::Error::other("entity is dead"),
742+
_ => unreachable!(),
743+
};
744+
self.tx
745+
.send(ExportRangesItem::Error(api::Error::Io(err)))
746+
.await
747+
.ok();
748+
}
738749
}
739750
impl HashSpecificCommand for ImportBaoMsg {
740751
async fn handle(self, ctx: HashContext) {
741752
ctx.import_bao(self).await
742753
}
743-
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
754+
async fn on_error(self, arg: SpawnArg<EmParams>) {
755+
let err = match arg {
756+
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
757+
SpawnArg::Dead => io::Error::other("entity is dead"),
758+
_ => unreachable!(),
759+
};
760+
self.tx.send(Err(api::Error::Io(err))).await.ok();
761+
}
744762
}
745763
impl HashSpecific for (TempTag, ImportEntryMsg) {
746764
fn hash(&self) -> Hash {
@@ -752,7 +770,14 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
752770
let (tt, cmd) = self;
753771
ctx.finish_import(cmd, tt).await
754772
}
755-
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
773+
async fn on_error(self, arg: SpawnArg<EmParams>) {
774+
let err = match arg {
775+
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
776+
SpawnArg::Dead => io::Error::other("entity is dead"),
777+
_ => unreachable!(),
778+
};
779+
self.1.tx.send(AddProgressItem::Error(err)).await.ok();
780+
}
756781
}
757782

758783
struct RtWrapper(Option<tokio::runtime::Runtime>);
@@ -809,8 +834,50 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>)
809834
Ok(())
810835
}
811836

837+
/// The minimal API you need to implement for an entity for a store to work.
838+
trait EntityApi {
839+
/// Import from a stream of n0 bao encoded data.
840+
async fn import_bao(&self, cmd: ImportBaoMsg);
841+
/// Finish an import from a local file or memory.
842+
async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag);
843+
/// Observe the bitfield of the entry.
844+
async fn observe(&self, cmd: ObserveMsg);
845+
/// Export byte ranges of the entry as data
846+
async fn export_ranges(&self, cmd: ExportRangesMsg);
847+
/// Export chunk ranges of the entry as a n0 bao encoded stream.
848+
async fn export_bao(&self, cmd: ExportBaoMsg);
849+
/// Export the entry to a local file.
850+
async fn export_path(&self, cmd: ExportPathMsg);
851+
/// Persist the entry at the end of its lifecycle.
852+
async fn persist(&self);
853+
}
854+
855+
/// A more opinionated API that can be used as a helper to save implementation
856+
/// effort when implementing the EntityApi trait.
857+
trait SyncEntityApi: EntityApi {
858+
/// Load the entry state from the database. This must make sure that it is
859+
/// not run concurrently, so if load is called multiple times, all but one
860+
/// must wait. You can use a tokio::sync::OnceCell or similar to achieve this.
861+
async fn load(&self);
862+
863+
/// Get a synchronous reader for the data file.
864+
fn data_reader(&self) -> impl ReadBytesAt;
865+
866+
/// Get a synchronous reader for the outboard file.
867+
fn outboard_reader(&self) -> impl ReadAt;
868+
869+
/// Get the best known size of the data file.
870+
fn current_size(&self) -> io::Result<u64>;
871+
872+
/// Get the bitfield of the entry.
873+
fn bitfield(&self) -> io::Result<Bitfield>;
874+
875+
/// Write a batch of content items to the entry.
876+
async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()>;
877+
}
878+
812879
/// The high level entry point per entry.
813-
impl HashContext {
880+
impl EntityApi for HashContext {
814881
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
815882
async fn import_bao(&self, cmd: ImportBaoMsg) {
816883
trace!("{cmd:?}");
@@ -890,6 +957,27 @@ impl HashContext {
890957
};
891958
cmd.tx.send(res).await.ok();
892959
}
960+
961+
#[instrument(skip_all, fields(hash = %self.id.fmt_short()))]
962+
async fn persist(&self) {
963+
self.state.send_if_modified(|guard| {
964+
let hash = &self.id;
965+
let BaoFileStorage::Partial(fs) = guard.take() else {
966+
return false;
967+
};
968+
let path = self.global.options.path.bitfield_path(hash);
969+
trace!("writing bitfield for hash {} to {}", hash, path.display());
970+
if let Err(cause) = fs.sync_all(&path) {
971+
error!(
972+
"failed to write bitfield for {} at {}: {:?}",
973+
hash,
974+
path.display(),
975+
cause
976+
);
977+
}
978+
false
979+
});
980+
}
893981
}
894982

895983
async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> {

0 commit comments

Comments
 (0)