1
- use std:: cmp:: Ordering ;
2
-
3
- use async_recursion:: async_recursion;
4
- use async_trait:: async_trait;
5
- use byteorder:: { LittleEndian , WriteBytesExt } ;
6
-
7
1
use crate :: directory:: directory_partition:: DirectoryPartition ;
8
2
use crate :: directory:: directory_subspace:: DirectorySubspace ;
9
3
use crate :: directory:: error:: DirectoryError ;
10
4
use crate :: directory:: node:: Node ;
11
5
use crate :: directory:: { compare_slice, Directory , DirectoryOutput } ;
12
- use crate :: future:: { FdbKeyValue , FdbSlice , FdbValue , FdbValuesIter } ;
6
+ use crate :: future:: FdbSlice ;
13
7
use crate :: tuple:: hca:: HighContentionAllocator ;
14
- use crate :: tuple:: { Subspace , TuplePack } ;
8
+ use crate :: tuple:: { unpack , Element , Subspace , TuplePack } ;
15
9
use crate :: RangeOption ;
16
10
use crate :: { FdbResult , Transaction } ;
17
- use futures:: prelude:: stream:: { Iter , Next } ;
18
- use futures:: stream:: StreamExt ;
11
+ use async_recursion:: async_recursion;
12
+ use async_trait:: async_trait;
13
+ use byteorder:: { LittleEndian , WriteBytesExt } ;
19
14
use futures:: try_join;
20
- use futures:: { join, TryStreamExt } ;
21
- use parking_lot:: { FairMutex , RawMutex } ;
15
+ use std:: cmp:: Ordering ;
22
16
use std:: option:: Option :: Some ;
23
- use std:: rc:: Rc ;
24
- use std:: sync:: { Arc , Mutex , MutexGuard , PoisonError } ;
25
17
26
18
pub ( crate ) const DEFAULT_SUB_DIRS : i64 = 0 ;
27
19
const MAJOR_VERSION : u32 = 1 ;
28
20
const MINOR_VERSION : u32 = 0 ;
29
21
const PATCH_VERSION : u32 = 0 ;
30
- pub ( crate ) const DEFAULT_NODE_PREFIX : & [ u8 ] = b"\xFE " ;
22
+ pub const DEFAULT_NODE_PREFIX : & [ u8 ] = b"\xFE " ;
31
23
const DEFAULT_HCA_PREFIX : & [ u8 ] = b"hca" ;
32
24
pub ( crate ) const PARTITION_LAYER : & [ u8 ] = b"partition" ;
33
25
26
+ /// Directories are identified by hierarchical paths analogous to the paths
27
+ /// in a Unix-like file system. A path is represented as a List of strings.
28
+ /// Each directory has an associated subspace used to store its content. The
29
+ /// layer maps each path to a short prefix used for the corresponding
30
+ /// subspace. In effect, directories provide a level of indirection for
31
+ /// access to subspaces.
34
32
#[ derive( Debug , Clone ) ]
35
33
pub struct DirectoryLayer {
36
34
pub root_node : Subspace ,
@@ -71,6 +69,10 @@ impl DirectoryLayer {
71
69
}
72
70
}
73
71
72
+ pub fn get_path ( & self ) -> Vec < String > {
73
+ self . path . clone ( )
74
+ }
75
+
74
76
fn node_with_optional_prefix ( & self , prefix : Option < FdbSlice > ) -> Option < Subspace > {
75
77
match prefix {
76
78
None => None ,
@@ -94,10 +96,13 @@ impl DirectoryLayer {
94
96
// walking through the provided path
95
97
for path_name in path. iter ( ) {
96
98
node. current_path . push ( path_name. clone ( ) ) ;
97
- let key = node
98
- . subspace
99
- . unwrap ( )
100
- . subspace ( & ( DEFAULT_SUB_DIRS , path_name. to_owned ( ) ) ) ;
99
+ let node_subspace = match node. subspace {
100
+ // unreachable because on first iteration, it is set to root_node,
101
+ // on other iteration, `node.exists` is checking for the subspace's value
102
+ None => unreachable ! ( "node's subspace is not set" ) ,
103
+ Some ( s) => s,
104
+ } ;
105
+ let key = node_subspace. subspace ( & ( DEFAULT_SUB_DIRS , path_name. to_owned ( ) ) ) ;
101
106
102
107
// finding the next node
103
108
let fdb_slice_value = trx. get ( key. bytes ( ) , false ) . await ?;
@@ -141,8 +146,6 @@ impl DirectoryLayer {
141
146
) -> Result < DirectoryOutput , DirectoryError > {
142
147
let prefix: Vec < u8 > = self . node_subspace . unpack ( node. bytes ( ) ) ?;
143
148
144
- println ! ( "prefix: {:?}" , & prefix) ;
145
-
146
149
if layer. eq ( PARTITION_LAYER ) {
147
150
Ok ( DirectoryOutput :: DirectoryPartition (
148
151
DirectoryPartition :: new ( self . to_absolute_path ( & path) , prefix, self . clone ( ) ) ,
@@ -188,8 +191,13 @@ impl DirectoryLayer {
188
191
// TODO true or false?
189
192
if node. is_in_partition ( false ) {
190
193
let sub_path = node. get_partition_subpath ( ) ;
194
+ let subspace_node = match node. subspace {
195
+ // not reachable because `self.find` is creating a node with a subspace,
196
+ None => unreachable ! ( "node's subspace is not set" ) ,
197
+ Some ( s) => s,
198
+ } ;
191
199
let dir_space =
192
- self . contents_of_node ( node . subspace . unwrap ( ) , node. current_path , node. layer ) ?;
200
+ self . contents_of_node ( subspace_node , node. current_path , node. layer ) ?;
193
201
dir_space
194
202
. create_or_open ( trx, sub_path. to_owned ( ) , prefix, layer)
195
203
. await ?;
@@ -223,10 +231,16 @@ impl DirectoryLayer {
223
231
} ,
224
232
}
225
233
234
+ let subspace_node = match node. to_owned ( ) . subspace {
235
+ // not reachable because `self.find` is creating a node with a subspace,
236
+ None => unreachable ! ( "node's subspace is not set" ) ,
237
+ Some ( s) => s,
238
+ } ;
239
+
226
240
self . contents_of_node (
227
- node . subspace . as_ref ( ) . unwrap ( ) . clone ( ) ,
241
+ subspace_node . clone ( ) . subspace ( & ( ) ) ,
228
242
node. target_path . to_owned ( ) ,
229
- layer. unwrap_or ( vec ! [ ] ) ,
243
+ node . layer . to_owned ( ) ,
230
244
)
231
245
}
232
246
@@ -247,8 +261,6 @@ impl DirectoryLayer {
247
261
self . check_version ( trx, allow_create) . await ?;
248
262
let new_prefix = self . get_prefix ( trx, prefix. clone ( ) ) . await ?;
249
263
250
- println ! ( "new_prefix: {:?}" , & new_prefix) ;
251
-
252
264
let is_prefix_free = self
253
265
. is_prefix_free ( trx, new_prefix. to_owned ( ) , !prefix. is_some ( ) )
254
266
. await ?;
@@ -258,18 +270,12 @@ impl DirectoryLayer {
258
270
}
259
271
260
272
let parent_node = self . get_parent_node ( trx, path. to_owned ( ) ) . await ?;
261
- println ! ( "parent_node: {:?}" , & parent_node) ;
262
273
let node = self . node_with_prefix ( & new_prefix) ;
263
- println ! ( "node: {:?}" , & node) ;
264
274
265
275
let key = parent_node. subspace ( & ( DEFAULT_SUB_DIRS , path. last ( ) . unwrap ( ) ) ) ;
266
276
267
277
trx. set ( & key. bytes ( ) , & new_prefix) ;
268
- trx. set ( node. subspace ( & b"layer" . to_vec ( ) ) . bytes ( ) , & layer) ;
269
- println ! (
270
- "writing layer in row {:?}" ,
271
- node. subspace( & b"layer" . to_vec( ) ) . bytes( )
272
- ) ;
278
+ trx. set ( & node. pack ( & ( String :: from ( "layer" ) ) ) , & layer) ;
273
279
274
280
self . contents_of_node ( node, path. to_owned ( ) , layer. to_owned ( ) )
275
281
}
@@ -282,12 +288,9 @@ impl DirectoryLayer {
282
288
if path. len ( ) > 1 {
283
289
let ( _, list) = path. split_last ( ) . unwrap ( ) ;
284
290
285
- println ! ( "searching for parent" ) ;
286
-
287
291
let parent = self
288
292
. create_or_open_internal ( trx, list. to_vec ( ) , None , None , true , true )
289
293
. await ?;
290
- println ! ( "found a parent: {:?}" , parent. bytes( ) ) ;
291
294
Ok ( self . node_with_prefix ( & parent. bytes ( ) . to_vec ( ) ) )
292
295
} else {
293
296
Ok ( self . root_node . clone ( ) )
@@ -329,27 +332,21 @@ impl DirectoryLayer {
329
332
if key. starts_with ( self . node_subspace . bytes ( ) ) {
330
333
return Ok ( Some ( self . root_node . clone ( ) ) ) ;
331
334
}
332
- // FIXME: got sometimes an error where the scan include another layer...
333
- // https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectoryLayer.actor.cpp#L186-L194
334
- let ( begin_range, _) = self . node_subspace . range ( ) ;
335
- let mut end_range = self . node_subspace . pack ( & key) ;
336
- // simulate keyAfter
337
- end_range. push ( 0 ) ;
338
335
339
- // checking range
340
- let result = trx
341
- . get_range ( & RangeOption :: from ( ( begin_range, end_range) ) , 1 , snapshot)
342
- . await ?;
336
+ let key = self . node_subspace . pack ( & key) ;
343
337
344
- if result. len ( ) > 0 {
345
- let previous_prefix: ( String ) =
346
- self . node_subspace . unpack ( result. get ( 0 ) . unwrap ( ) . key ( ) ) ?;
347
- if key. starts_with ( previous_prefix. as_bytes ( ) ) {
348
- return Ok ( Some ( self . node_with_prefix ( & ( previous_prefix) ) ) ) ;
338
+ // checking range
339
+ match trx. get ( & key, snapshot) . await ? {
340
+ None => Ok ( None ) ,
341
+ Some ( _) => {
342
+ let previous_prefix: Vec < u8 > = self . node_subspace . unpack ( key. as_slice ( ) ) ?;
343
+ if key. starts_with ( & previous_prefix) {
344
+ Ok ( Some ( self . node_with_prefix ( & previous_prefix) ) )
345
+ } else {
346
+ Ok ( None )
347
+ }
349
348
}
350
349
}
351
-
352
- Ok ( None )
353
350
}
354
351
355
352
async fn get_prefix (
@@ -390,7 +387,7 @@ impl DirectoryLayer {
390
387
if allow_creation {
391
388
self . initialize_directory ( trx) . await
392
389
} else {
393
- Err ( DirectoryError :: MissingDirectory )
390
+ return Ok ( ( ) ) ;
394
391
}
395
392
}
396
393
Some ( versions) => {
@@ -440,6 +437,7 @@ impl DirectoryLayer {
440
437
async fn get_version_value ( & self , trx : & Transaction ) -> FdbResult < Option < FdbSlice > > {
441
438
let version_subspace: & [ u8 ] = b"version" ;
442
439
let version_key = self . root_node . subspace ( & version_subspace) ;
440
+
443
441
trx. get ( version_key. bytes ( ) , false ) . await
444
442
}
445
443
@@ -450,19 +448,21 @@ impl DirectoryLayer {
450
448
) -> Result < bool , DirectoryError > {
451
449
self . check_version ( trx, false ) . await ?;
452
450
453
- if path. is_empty ( ) {
454
- return Err ( DirectoryError :: NoPathProvided ) ;
455
- }
456
-
457
451
let node = self . find ( trx, path. to_owned ( ) ) . await ?;
458
452
459
453
if !node. exists ( ) {
460
454
return Ok ( false ) ;
461
455
}
462
456
463
457
if node. is_in_partition ( false ) {
458
+ let subspace_node = match node. subspace {
459
+ // not reachable because `self.find` is creating a node with a subspace,
460
+ None => unreachable ! ( "node's subspace is not set" ) ,
461
+ Some ( ref s) => s. clone ( ) ,
462
+ } ;
463
+
464
464
let directory_partition = self . contents_of_node (
465
- node . clone ( ) . subspace . unwrap ( ) ,
465
+ subspace_node ,
466
466
node. current_path . to_owned ( ) ,
467
467
node. layer . to_owned ( ) ,
468
468
) ?;
@@ -485,9 +485,15 @@ impl DirectoryLayer {
485
485
if !node. exists ( ) {
486
486
return Err ( DirectoryError :: PathDoesNotExists ) ;
487
487
}
488
- if node. is_in_partition ( true ) {
488
+ if node. is_in_partition ( false ) {
489
+ let subspace_node = match node. subspace {
490
+ // not reachable because `self.find` is creating a node with a subspace.
491
+ None => unreachable ! ( "node's subspace is not set" ) ,
492
+ Some ( ref s) => s. clone ( ) ,
493
+ } ;
494
+
489
495
let directory_partition = self . contents_of_node (
490
- node . clone ( ) . subspace . unwrap ( ) ,
496
+ subspace_node ,
491
497
node. current_path . to_owned ( ) ,
492
498
node. layer . to_owned ( ) ,
493
499
) ?;
@@ -528,8 +534,14 @@ impl DirectoryLayer {
528
534
return Err ( DirectoryError :: CannotMoveBetweenPartition ) ;
529
535
}
530
536
537
+ let subspace_new_node = match new_node. subspace {
538
+ // not reachable because `self.find` is creating a node with a subspace,
539
+ None => unreachable ! ( "node's subspace is not set" ) ,
540
+ Some ( ref s) => s. clone ( ) ,
541
+ } ;
542
+
531
543
let directory_partition = self . contents_of_node (
532
- new_node . clone ( ) . subspace . unwrap ( ) ,
544
+ subspace_new_node ,
533
545
new_node. current_path . to_owned ( ) ,
534
546
new_node. layer . to_owned ( ) ,
535
547
) ?;
@@ -557,10 +569,14 @@ impl DirectoryLayer {
557
569
return Err ( DirectoryError :: ParentDirDoesNotExists ) ;
558
570
}
559
571
560
- let key = parent_node
561
- . subspace
562
- . unwrap ( )
563
- . subspace ( & ( DEFAULT_SUB_DIRS , new_path. to_owned ( ) . last ( ) . unwrap ( ) ) ) ;
572
+ let subspace_parent_node = match parent_node. subspace {
573
+ // not reachable because `self.find` is creating a node with a subspace,
574
+ None => unreachable ! ( "node's subspace is not set" ) ,
575
+ Some ( ref s) => s. clone ( ) ,
576
+ } ;
577
+
578
+ let key =
579
+ subspace_parent_node. subspace ( & ( DEFAULT_SUB_DIRS , new_path. to_owned ( ) . last ( ) . unwrap ( ) ) ) ;
564
580
let value: Vec < u8 > = self
565
581
. node_subspace
566
582
. unpack ( old_node. subspace . clone ( ) . unwrap ( ) . bytes ( ) ) ?;
@@ -589,8 +605,8 @@ impl DirectoryLayer {
589
605
match parent_node. subspace {
590
606
None => { }
591
607
Some ( subspace) => {
592
- let key = subspace. subspace ( & ( DEFAULT_SUB_DIRS , last_element) ) ;
593
- trx. clear ( & key. bytes ( ) ) ;
608
+ let key = subspace. pack ( & ( DEFAULT_SUB_DIRS , last_element) ) ;
609
+ trx. clear ( & key) ;
594
610
}
595
611
}
596
612
@@ -625,10 +641,9 @@ impl DirectoryLayer {
625
641
. await ;
626
642
}
627
643
628
- try_join ! (
629
- self . remove_recursive( trx, node. subspace. unwrap( ) . clone( ) ) ,
630
- self . remove_from_parent( trx, path. to_owned( ) )
631
- ) ;
644
+ self . remove_recursive ( trx, node. subspace . unwrap ( ) . clone ( ) )
645
+ . await ?;
646
+ self . remove_from_parent ( trx, path. to_owned ( ) ) . await ?;
632
647
633
648
Ok ( true )
634
649
}
@@ -645,18 +660,14 @@ impl DirectoryLayer {
645
660
loop {
646
661
let range_option = RangeOption :: from ( ( begin. as_slice ( ) , end. as_slice ( ) ) ) ;
647
662
648
- let range = trx. get_range ( & range_option, 1 , false ) . await ?;
663
+ let range = trx. get_range ( & range_option, 1024 , false ) . await ?;
649
664
let has_more = range. more ( ) ;
650
- let range: Arc < FairMutex < FdbValuesIter > > = Arc :: new ( FairMutex :: new ( range. into_iter ( ) ) ) ;
651
-
652
- loop {
653
- let value_row = match range. lock ( ) . next ( ) {
654
- None => break ,
655
- Some ( next_key_value) => next_key_value. value ( ) . to_vec ( ) ,
656
- } ;
657
665
658
- let sub_node = self . node_with_prefix ( & value_row) ;
666
+ for row_key in range {
667
+ let sub_node = self . node_with_prefix ( & row_key. value ( ) ) ;
659
668
self . remove_recursive ( trx, sub_node) . await ?;
669
+ begin = row_key. key ( ) . pack_to_vec ( ) ;
670
+ begin. push ( 0 ) ;
660
671
}
661
672
662
673
if !has_more {
@@ -667,7 +678,7 @@ impl DirectoryLayer {
667
678
let mut node_prefix: Vec < u8 > = self . node_subspace . unpack ( node_sub. bytes ( ) ) ?;
668
679
669
680
// equivalent of strinc?
670
- node_prefix. remove ( node_prefix. len ( ) ) ;
681
+ node_prefix. remove ( node_prefix. len ( ) - 1 ) ;
671
682
672
683
trx. clear_range ( node_prefix. as_slice ( ) , node_prefix. as_slice ( ) ) ;
673
684
trx. clear_subspace_range ( & node_sub) ;
0 commit comments