Skip to content

Commit dc47405

Browse files
committed
begin replacing fs with interface
replace fs::metadata
1 parent ce939a2 commit dc47405

File tree

2 files changed

+60
-19
lines changed

2 files changed

+60
-19
lines changed

src/store/fs.rs

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ use iroh_io::AsyncSliceReader;
8484
use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
8585
use serde::{Deserialize, Serialize};
8686
use smallvec::SmallVec;
87-
use tokio::io::AsyncWriteExt;
87+
use tokio::{io::AsyncWriteExt, runtime::Handle};
8888
use tracing::trace_span;
8989
mod tables;
9090
#[doc(hidden)]
@@ -441,6 +441,30 @@ pub(crate) enum ImportSource {
441441
Memory(#[debug(skip)] Bytes),
442442
}
443443

444+
/// trait which defines the backend persistence layer
445+
/// for this store. e.g. filesystem, s3 etc
446+
pub trait Persistence: Clone {
447+
/// the error type that is returned for the persistence layer
448+
type Err;
449+
450+
/// return the size of the file in bytes if it can be found/read
451+
/// otherwise return a [Self::Err]
452+
fn size(&self, path: &Path) -> impl Future<Output = Result<u64, Self::Err>>;
453+
}
454+
455+
/// A persistence layer that writes to the local file system
456+
#[derive(Debug, Clone, Copy)]
457+
pub struct FileSystemPersistence;
458+
459+
impl Persistence for FileSystemPersistence {
460+
type Err = io::Error;
461+
462+
fn size(&self, path: &Path) -> impl Future<Output = Result<u64, Self::Err>> {
463+
let res = std::fs::metadata(path).map(|m| m.len());
464+
async move { res }
465+
}
466+
}
467+
444468
impl ImportSource {
445469
fn content(&self) -> MemOrFile<&[u8], &Path> {
446470
match self {
@@ -450,10 +474,10 @@ impl ImportSource {
450474
}
451475
}
452476

453-
fn len(&self) -> io::Result<u64> {
477+
async fn len<T: Persistence>(&self, fs: &T) -> Result<u64, T::Err> {
454478
match self {
455-
Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()),
456-
Self::External(path) => std::fs::metadata(path).map(|m| m.len()),
479+
Self::TempFile(path) => fs.size(path).await,
480+
Self::External(path) => fs.size(path).await,
457481
Self::Memory(data) => Ok(data.len() as u64),
458482
}
459483
}
@@ -711,7 +735,7 @@ pub(crate) type FilterPredicate<K, V> =
711735
/// Storage that is using a redb database for small files and files for
712736
/// large files.
713737
#[derive(Debug, Clone)]
714-
pub struct Store(Arc<StoreInner>);
738+
pub struct Store<T = FileSystemPersistence>(Arc<StoreInner<T>>);
715739

716740
impl Store {
717741
/// Load or create a new store.
@@ -758,11 +782,12 @@ impl Store {
758782
}
759783

760784
#[derive(Debug)]
761-
struct StoreInner {
785+
struct StoreInner<T> {
762786
tx: async_channel::Sender<ActorMessage>,
763787
temp: Arc<RwLock<TempCounterMap>>,
764788
handle: Option<std::thread::JoinHandle<()>>,
765789
path_options: Arc<PathOptions>,
790+
fs: T,
766791
}
767792

768793
impl TagDrop for RwLock<TempCounterMap> {
@@ -777,8 +802,23 @@ impl TagCounter for RwLock<TempCounterMap> {
777802
}
778803
}
779804

780-
impl StoreInner {
805+
impl StoreInner<FileSystemPersistence> {
781806
fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
807+
Self::new_sync_with_backend(path, options, rt, FileSystemPersistence)
808+
}
809+
}
810+
811+
impl<T> StoreInner<T>
812+
where
813+
T: Persistence,
814+
OuterError: From<T::Err>,
815+
{
816+
fn new_sync_with_backend(
817+
path: PathBuf,
818+
options: Options,
819+
rt: tokio::runtime::Handle,
820+
fs: T,
821+
) -> io::Result<Self> {
782822
tracing::trace!(
783823
"creating data directory: {}",
784824
options.path.data_path.display()
@@ -811,6 +851,7 @@ impl StoreInner {
811851
temp,
812852
handle: Some(handle),
813853
path_options: Arc::new(options.path),
854+
fs,
814855
})
815856
}
816857

@@ -977,10 +1018,13 @@ impl StoreInner {
9771018
.into());
9781019
}
9791020
let parent = target.parent().ok_or_else(|| {
980-
OuterError::from(io::Error::new(
981-
io::ErrorKind::InvalidInput,
982-
"target path has no parent directory",
983-
))
1021+
OuterError::Inner(
1022+
io::Error::new(
1023+
io::ErrorKind::InvalidInput,
1024+
"target path has no parent directory",
1025+
)
1026+
.into(),
1027+
)
9841028
})?;
9851029
std::fs::create_dir_all(parent)?;
9861030
let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
@@ -1069,7 +1113,7 @@ impl StoreInner {
10691113
let file = match mode {
10701114
ImportMode::TryReference => ImportSource::External(path),
10711115
ImportMode::Copy => {
1072-
if std::fs::metadata(&path)?.len() < 16 * 1024 {
1116+
if Handle::current().block_on(self.fs.size(&path))? < 16 * 1024 {
10731117
// we don't know if the data will be inlined since we don't
10741118
// have the inline options here. But still for such a small file
10751119
// it does not seem worth it do to the temp file ceremony.
@@ -1108,7 +1152,7 @@ impl StoreInner {
11081152
id: u64,
11091153
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
11101154
) -> OuterResult<(TempTag, u64)> {
1111-
let data_size = file.len()?;
1155+
let data_size = Handle::current().block_on(file.len(&self.fs))?;
11121156
tracing::debug!("finalize_import_sync {:?} {}", file, data_size);
11131157
progress.blocking_send(ImportProgress::Size {
11141158
id,
@@ -1161,7 +1205,7 @@ impl StoreInner {
11611205
}
11621206
}
11631207

1164-
impl Drop for StoreInner {
1208+
impl<T> Drop for StoreInner<T> {
11651209
fn drop(&mut self) {
11661210
if let Some(handle) = self.handle.take() {
11671211
self.tx
@@ -1217,10 +1261,7 @@ pub(crate) enum ActorError {
12171261

12181262
impl From<ActorError> for io::Error {
12191263
fn from(e: ActorError) -> Self {
1220-
match e {
1221-
ActorError::Io(e) => e,
1222-
e => io::Error::new(io::ErrorKind::Other, e),
1223-
}
1264+
io::Error::new(io::ErrorKind::Other, e)
12241265
}
12251266
}
12261267

src/store/fs/test_support.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl Store {
102102
}
103103
}
104104

105-
impl StoreInner {
105+
impl<T> StoreInner<T> {
106106
#[cfg(test)]
107107
async fn entry_state(&self, hash: Hash) -> OuterResult<EntryStateResponse> {
108108
let (tx, rx) = oneshot::channel();

0 commit comments

Comments
 (0)