@@ -450,6 +450,11 @@ pub trait Persistence: Clone {
450
450
/// return the size of the file in bytes if it can be found/read
451
451
/// otherwise return a [Self::Err]
452
452
fn size ( & self , path : & Path ) -> impl Future < Output = Result < u64 , Self :: Err > > ;
453
+
454
+ /// read the contents of the file at the path
455
+ /// returning the bytes of the file in the success case
456
+ /// and [Self::Err] in the error case
457
+ fn read ( & self , path : & Path ) -> impl Future < Output = Result < Vec < u8 > , Self :: Err > > ;
453
458
}
454
459
455
460
/// A persistence layer that writes to the local file system
@@ -463,6 +468,11 @@ impl Persistence for FileSystemPersistence {
463
468
let res = std:: fs:: metadata ( path) . map ( |m| m. len ( ) ) ;
464
469
async move { res }
465
470
}
471
+
472
+ fn read ( & self , path : & Path ) -> impl Future < Output = Result < Vec < u8 > , Self :: Err > > {
473
+ let res = std:: fs:: read ( path) ;
474
+ async move { res }
475
+ }
466
476
}
467
477
468
478
impl ImportSource {
@@ -1117,7 +1127,7 @@ where
1117
1127
// we don't know if the data will be inlined since we don't
1118
1128
// have the inline options here. But still for such a small file
1119
1129
// it does not seem worth it do to the temp file ceremony.
1120
- let data = std :: fs :: read ( & path) ?;
1130
+ let data = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
1121
1131
ImportSource :: Memory ( data. into ( ) )
1122
1132
} else {
1123
1133
let temp_path = self . temp_file_name ( ) ;
@@ -1216,23 +1226,24 @@ impl<T> Drop for StoreInner<T> {
1216
1226
}
1217
1227
}
1218
1228
1219
- struct ActorState {
1229
+ struct ActorState < T > {
1220
1230
handles : BTreeMap < Hash , BaoFileHandleWeak > ,
1221
1231
protected : BTreeSet < Hash > ,
1222
1232
temp : Arc < RwLock < TempCounterMap > > ,
1223
1233
msgs_rx : async_channel:: Receiver < ActorMessage > ,
1224
1234
create_options : Arc < BaoFileConfig > ,
1225
1235
options : Options ,
1226
1236
rt : tokio:: runtime:: Handle ,
1237
+ fs : T ,
1227
1238
}
1228
1239
1229
1240
/// The actor for the redb store.
1230
1241
///
1231
1242
/// It is split into the database and the rest of the state to allow for split
1232
1243
/// borrows in the message handlers.
1233
- struct Actor {
1244
+ struct Actor < T = FileSystemPersistence > {
1234
1245
db : redb:: Database ,
1235
- state : ActorState ,
1246
+ state : ActorState < T > ,
1236
1247
}
1237
1248
1238
1249
/// Error type for message handler functions of the redb actor.
@@ -1586,12 +1597,13 @@ pub(super) async fn gc_sweep_task(
1586
1597
Ok ( ( ) )
1587
1598
}
1588
1599
1589
- impl Actor {
1590
- fn new (
1600
+ impl < T > Actor < T > {
1601
+ fn new_with_backend (
1591
1602
path : & Path ,
1592
1603
options : Options ,
1593
1604
temp : Arc < RwLock < TempCounterMap > > ,
1594
1605
rt : tokio:: runtime:: Handle ,
1606
+ fs : T ,
1595
1607
) -> ActorResult < ( Self , async_channel:: Sender < ActorMessage > ) > {
1596
1608
let db = match redb:: Database :: create ( path) {
1597
1609
Ok ( db) => db,
@@ -1635,11 +1647,23 @@ impl Actor {
1635
1647
options,
1636
1648
create_options : Arc :: new ( create_options) ,
1637
1649
rt,
1650
+ fs,
1638
1651
} ,
1639
1652
} ,
1640
1653
tx,
1641
1654
) )
1642
1655
}
1656
+ }
1657
+
1658
+ impl Actor {
1659
+ fn new (
1660
+ path : & Path ,
1661
+ options : Options ,
1662
+ temp : Arc < RwLock < TempCounterMap > > ,
1663
+ rt : tokio:: runtime:: Handle ,
1664
+ ) -> ActorResult < ( Self , async_channel:: Sender < ActorMessage > ) > {
1665
+ Self :: new_with_backend ( path, options, temp, rt, FileSystemPersistence )
1666
+ }
1643
1667
1644
1668
async fn run_batched ( mut self ) -> ActorResult < ( ) > {
1645
1669
let mut msgs = PeekableFlumeReceiver :: new ( self . state . msgs_rx . clone ( ) ) ;
@@ -1723,7 +1747,11 @@ impl Actor {
1723
1747
}
1724
1748
}
1725
1749
1726
- impl ActorState {
1750
+ impl < T > ActorState < T >
1751
+ where
1752
+ T : Persistence ,
1753
+ ActorError : From < T :: Err > ,
1754
+ {
1727
1755
fn entry_status (
1728
1756
& mut self ,
1729
1757
tables : & impl ReadableTables ,
@@ -1914,7 +1942,8 @@ impl ActorState {
1914
1942
"reading external data to inline it: {}" ,
1915
1943
external_path. display( )
1916
1944
) ;
1917
- let data = Bytes :: from ( std:: fs:: read ( & external_path) ?) ;
1945
+ let data =
1946
+ Bytes :: from ( Handle :: current ( ) . block_on ( self . fs . read ( & external_path) ) ?) ;
1918
1947
DataLocation :: Inline ( data)
1919
1948
} else {
1920
1949
DataLocation :: External ( vec ! [ external_path] , data_size)
@@ -2167,7 +2196,7 @@ impl ActorState {
2167
2196
// inline
2168
2197
if size <= self . options . inline . max_data_inlined {
2169
2198
let path = self . options . path . owned_data_path ( & hash) ;
2170
- let data = std :: fs :: read ( & path) ?;
2199
+ let data = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
2171
2200
tables. delete_after_commit . insert ( hash, [ BaoFilePart :: Data ] ) ;
2172
2201
tables. inline_data . insert ( hash, data. as_slice ( ) ) ?;
2173
2202
( DataLocation :: Inline ( ( ) ) , size, true )
@@ -2202,7 +2231,7 @@ impl ActorState {
2202
2231
if outboard_size <= self . options . inline . max_outboard_inlined =>
2203
2232
{
2204
2233
let path = self . options . path . owned_outboard_path ( & hash) ;
2205
- let outboard = std :: fs :: read ( & path) ?;
2234
+ let outboard = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
2206
2235
tables
2207
2236
. delete_after_commit
2208
2237
. insert ( hash, [ BaoFilePart :: Outboard ] ) ;
0 commit comments