@@ -450,6 +450,11 @@ pub trait Persistence: Clone {
450
450
/// return the size of the file in bytes if it can be found/read
451
451
/// otherwise return a [Self::Err]
452
452
fn size ( & self , path : & Path ) -> impl Future < Output = Result < u64 , Self :: Err > > ;
453
+
454
+ /// read the contents of the file at the path
455
+ /// returning the bytes of the file in the success case
456
+ /// and [Self::Err] in the error case
457
+ fn read ( & self , path : & Path ) -> impl Future < Output = Result < Vec < u8 > , Self :: Err > > ;
453
458
}
454
459
455
460
/// A persistence layer that writes to the local file system
@@ -463,6 +468,11 @@ impl Persistence for FileSystemPersistence {
463
468
let res = std:: fs:: metadata ( path) . map ( |m| m. len ( ) ) ;
464
469
async move { res }
465
470
}
471
+
472
+ fn read ( & self , path : & Path ) -> impl Future < Output = Result < Vec < u8 > , Self :: Err > > {
473
+ let res = std:: fs:: read ( path) ;
474
+ async move { res }
475
+ }
466
476
}
467
477
468
478
impl ImportSource {
@@ -1117,7 +1127,7 @@ where
1117
1127
// we don't know if the data will be inlined since we don't
1118
1128
// have the inline options here. But still for such a small file
1119
1129
// it does not seem worth it do to the temp file ceremony.
1120
- let data = std :: fs :: read ( & path) ?;
1130
+ let data = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
1121
1131
ImportSource :: Memory ( data. into ( ) )
1122
1132
} else {
1123
1133
let temp_path = self . temp_file_name ( ) ;
@@ -1216,23 +1226,24 @@ impl<T> Drop for StoreInner<T> {
1216
1226
}
1217
1227
}
1218
1228
1219
- struct ActorState {
1229
+ struct ActorState < T > {
1220
1230
handles : BTreeMap < Hash , BaoFileHandleWeak > ,
1221
1231
protected : BTreeSet < Hash > ,
1222
1232
temp : Arc < RwLock < TempCounterMap > > ,
1223
1233
msgs_rx : async_channel:: Receiver < ActorMessage > ,
1224
1234
create_options : Arc < BaoFileConfig > ,
1225
1235
options : Options ,
1226
1236
rt : tokio:: runtime:: Handle ,
1237
+ fs : T ,
1227
1238
}
1228
1239
1229
1240
/// The actor for the redb store.
1230
1241
///
1231
1242
/// It is split into the database and the rest of the state to allow for split
1232
1243
/// borrows in the message handlers.
1233
- struct Actor {
1244
+ struct Actor < T = FileSystemPersistence > {
1234
1245
db : redb:: Database ,
1235
- state : ActorState ,
1246
+ state : ActorState < T > ,
1236
1247
}
1237
1248
1238
1249
/// Error type for message handler functions of the redb actor.
@@ -1583,12 +1594,13 @@ pub(super) async fn gc_sweep_task(
1583
1594
Ok ( ( ) )
1584
1595
}
1585
1596
1586
- impl Actor {
1587
- fn new (
1597
+ impl < T > Actor < T > {
1598
+ fn new_with_backend (
1588
1599
path : & Path ,
1589
1600
options : Options ,
1590
1601
temp : Arc < RwLock < TempCounterMap > > ,
1591
1602
rt : tokio:: runtime:: Handle ,
1603
+ fs : T ,
1592
1604
) -> ActorResult < ( Self , async_channel:: Sender < ActorMessage > ) > {
1593
1605
let db = match redb:: Database :: create ( path) {
1594
1606
Ok ( db) => db,
@@ -1632,11 +1644,23 @@ impl Actor {
1632
1644
options,
1633
1645
create_options : Arc :: new ( create_options) ,
1634
1646
rt,
1647
+ fs,
1635
1648
} ,
1636
1649
} ,
1637
1650
tx,
1638
1651
) )
1639
1652
}
1653
+ }
1654
+
1655
+ impl Actor {
1656
+ fn new (
1657
+ path : & Path ,
1658
+ options : Options ,
1659
+ temp : Arc < RwLock < TempCounterMap > > ,
1660
+ rt : tokio:: runtime:: Handle ,
1661
+ ) -> ActorResult < ( Self , async_channel:: Sender < ActorMessage > ) > {
1662
+ Self :: new_with_backend ( path, options, temp, rt, FileSystemPersistence )
1663
+ }
1640
1664
1641
1665
async fn run_batched ( mut self ) -> ActorResult < ( ) > {
1642
1666
let mut msgs = PeekableFlumeReceiver :: new ( self . state . msgs_rx . clone ( ) ) ;
@@ -1720,7 +1744,11 @@ impl Actor {
1720
1744
}
1721
1745
}
1722
1746
1723
- impl ActorState {
1747
+ impl < T > ActorState < T >
1748
+ where
1749
+ T : Persistence ,
1750
+ ActorError : From < T :: Err > ,
1751
+ {
1724
1752
fn entry_status (
1725
1753
& mut self ,
1726
1754
tables : & impl ReadableTables ,
@@ -1911,7 +1939,8 @@ impl ActorState {
1911
1939
"reading external data to inline it: {}" ,
1912
1940
external_path. display( )
1913
1941
) ;
1914
- let data = Bytes :: from ( std:: fs:: read ( & external_path) ?) ;
1942
+ let data =
1943
+ Bytes :: from ( Handle :: current ( ) . block_on ( self . fs . read ( & external_path) ) ?) ;
1915
1944
DataLocation :: Inline ( data)
1916
1945
} else {
1917
1946
DataLocation :: External ( vec ! [ external_path] , data_size)
@@ -2164,7 +2193,7 @@ impl ActorState {
2164
2193
// inline
2165
2194
if size <= self . options . inline . max_data_inlined {
2166
2195
let path = self . options . path . owned_data_path ( & hash) ;
2167
- let data = std :: fs :: read ( & path) ?;
2196
+ let data = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
2168
2197
tables. delete_after_commit . insert ( hash, [ BaoFilePart :: Data ] ) ;
2169
2198
tables. inline_data . insert ( hash, data. as_slice ( ) ) ?;
2170
2199
( DataLocation :: Inline ( ( ) ) , size, true )
@@ -2199,7 +2228,7 @@ impl ActorState {
2199
2228
if outboard_size <= self . options . inline . max_outboard_inlined =>
2200
2229
{
2201
2230
let path = self . options . path . owned_outboard_path ( & hash) ;
2202
- let outboard = std :: fs :: read ( & path) ?;
2231
+ let outboard = Handle :: current ( ) . block_on ( self . fs . read ( & path) ) ?;
2203
2232
tables
2204
2233
. delete_after_commit
2205
2234
. insert ( hash, [ BaoFilePart :: Outboard ] ) ;
0 commit comments