@@ -31,7 +31,7 @@ use super::{
31
31
use crate :: {
32
32
api:: blobs:: Bitfield ,
33
33
store:: {
34
- fs:: { meta:: raw_outboard_size, TaskContext } ,
34
+ fs:: { meta:: raw_outboard_size, HashContext , TaskContext } ,
35
35
util:: {
36
36
read_checksummed_and_truncate, write_checksummed, FixedSize , MemOrFile ,
37
37
PartialMemStorage , DD ,
@@ -401,21 +401,20 @@ impl BaoFileStorage {
401
401
self ,
402
402
batch : & [ BaoContentItem ] ,
403
403
bitfield : & Bitfield ,
404
- ctx : & TaskContext ,
405
- hash : & Hash ,
404
+ ctx : & HashContext ,
406
405
) -> io:: Result < ( Self , Option < EntryState < bytes:: Bytes > > ) > {
407
406
Ok ( match self {
408
407
BaoFileStorage :: PartialMem ( mut ms) => {
409
408
// check if we need to switch to file mode, otherwise write to memory
410
- if max_offset ( batch) <= ctx. options . inline . max_data_inlined {
409
+ if max_offset ( batch) <= ctx. global . options . inline . max_data_inlined {
411
410
ms. write_batch ( bitfield. size ( ) , batch) ?;
412
411
let changes = ms. bitfield . update ( bitfield) ;
413
412
let new = changes. new_state ( ) ;
414
413
if new. complete {
415
- let ( cs, update) = ms. into_complete ( hash , ctx) ?;
414
+ let ( cs, update) = ms. into_complete ( & ctx . id , & ctx. global ) ?;
416
415
( cs. into ( ) , Some ( update) )
417
416
} else {
418
- let fs = ms. persist ( ctx, hash ) ?;
417
+ let fs = ms. persist ( & ctx. global , & ctx . id ) ?;
419
418
let update = EntryState :: Partial {
420
419
size : new. validated_size ,
421
420
} ;
@@ -428,13 +427,13 @@ impl BaoFileStorage {
428
427
// a write at the end of a very large file.
429
428
//
430
429
// opt: we should check if we become complete to avoid going from mem to partial to complete
431
- let mut fs = ms. persist ( ctx, hash ) ?;
430
+ let mut fs = ms. persist ( & ctx. global , & ctx . id ) ?;
432
431
fs. write_batch ( bitfield. size ( ) , batch) ?;
433
432
let changes = fs. bitfield . update ( bitfield) ;
434
433
let new = changes. new_state ( ) ;
435
434
if new. complete {
436
435
let size = new. validated_size . unwrap ( ) ;
437
- let ( cs, update) = fs. into_complete ( size, & ctx. options ) ?;
436
+ let ( cs, update) = fs. into_complete ( size, & ctx. global . options ) ?;
438
437
( cs. into ( ) , Some ( update) )
439
438
} else {
440
439
let update = EntryState :: Partial {
@@ -450,7 +449,7 @@ impl BaoFileStorage {
450
449
let new = changes. new_state ( ) ;
451
450
if new. complete {
452
451
let size = new. validated_size . unwrap ( ) ;
453
- let ( cs, update) = fs. into_complete ( size, & ctx. options ) ?;
452
+ let ( cs, update) = fs. into_complete ( size, & ctx. global . options ) ?;
454
453
( cs. into ( ) , Some ( update) )
455
454
} else if changes. was_validated ( ) {
456
455
// we are still partial, but now we know the size
@@ -503,46 +502,29 @@ impl BaoFileStorage {
503
502
}
504
503
}
505
504
506
- /// The inner part of a bao file handle.
507
- pub struct BaoFileHandleInner {
508
- pub ( crate ) storage : watch:: Sender < BaoFileStorage > ,
509
- hash : Hash ,
510
- }
511
-
512
- impl fmt:: Debug for BaoFileHandleInner {
513
- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
514
- let guard = self . storage . borrow ( ) ;
515
- let storage = guard. deref ( ) ;
516
- f. debug_struct ( "BaoFileHandleInner" )
517
- . field ( "hash" , & DD ( self . hash ) )
518
- . field ( "storage" , & storage)
519
- . finish_non_exhaustive ( )
520
- }
521
- }
522
-
523
505
/// A cheaply cloneable handle to a bao file, including the hash and the configuration.
524
506
#[ derive( Debug , Clone , derive_more:: Deref ) ]
525
- pub struct BaoFileHandle ( Arc < BaoFileHandleInner > ) ;
507
+ pub ( crate ) struct BaoFileHandle ( Arc < watch :: Sender < BaoFileStorage > > ) ;
526
508
527
509
impl BaoFileHandle {
528
- pub fn persist ( & mut self , options : & Options ) {
529
- self . 0 . storage . send_if_modified ( |guard| {
510
+ pub fn persist ( & mut self , hash : & Hash , options : & Options ) {
511
+ self . send_if_modified ( |guard| {
530
512
if Arc :: strong_count ( & self . 0 ) > 1 {
531
513
return false ;
532
514
}
533
515
let BaoFileStorage :: Partial ( fs) = guard. take ( ) else {
534
516
return false ;
535
517
} ;
536
- let path = options. path . bitfield_path ( & self . hash ) ;
518
+ let path = options. path . bitfield_path ( hash) ;
537
519
trace ! (
538
520
"writing bitfield for hash {} to {}" ,
539
- self . hash,
521
+ hash,
540
522
path. display( )
541
523
) ;
542
524
if let Err ( cause) = fs. sync_all ( & path) {
543
525
error ! (
544
526
"failed to write bitfield for {} at {}: {:?}" ,
545
- self . hash,
527
+ hash,
546
528
path. display( ) ,
547
529
cause
548
530
) ;
@@ -558,7 +540,7 @@ pub struct DataReader(BaoFileHandle);
558
540
559
541
impl ReadBytesAt for DataReader {
560
542
fn read_bytes_at ( & self , offset : u64 , size : usize ) -> std:: io:: Result < Bytes > {
561
- let guard = self . 0 . storage . borrow ( ) ;
543
+ let guard = self . 0 . borrow ( ) ;
562
544
match guard. deref ( ) {
563
545
BaoFileStorage :: PartialMem ( x) => x. data . read_bytes_at ( offset, size) ,
564
546
BaoFileStorage :: Partial ( x) => x. data . read_bytes_at ( offset, size) ,
@@ -574,7 +556,7 @@ pub struct OutboardReader(BaoFileHandle);
574
556
575
557
impl ReadAt for OutboardReader {
576
558
fn read_at ( & self , offset : u64 , buf : & mut [ u8 ] ) -> io:: Result < usize > {
577
- let guard = self . 0 . storage . borrow ( ) ;
559
+ let guard = self . 0 . borrow ( ) ;
578
560
match guard. deref ( ) {
579
561
BaoFileStorage :: Complete ( x) => x. outboard . read_at ( offset, buf) ,
580
562
BaoFileStorage :: PartialMem ( x) => x. outboard . read_at ( offset, buf) ,
@@ -593,12 +575,9 @@ impl BaoFileHandle {
593
575
/// Create a new bao file handle.
594
576
///
595
577
/// This will create a new file handle with an empty memory storage.
596
- pub fn new_partial_mem ( hash : Hash ) -> Self {
578
+ pub fn new_partial_mem ( ) -> Self {
597
579
let storage = BaoFileStorage :: partial_mem ( ) ;
598
- Self ( Arc :: new ( BaoFileHandleInner {
599
- storage : watch:: Sender :: new ( storage) ,
600
- hash,
601
- } ) )
580
+ Self ( Arc :: new ( watch:: Sender :: new ( storage) ) )
602
581
}
603
582
604
583
/// Create a new bao file handle with a partial file.
@@ -614,23 +593,16 @@ impl BaoFileHandle {
614
593
} else {
615
594
storage. into ( )
616
595
} ;
617
- Ok ( Self ( Arc :: new ( BaoFileHandleInner {
618
- storage : watch:: Sender :: new ( storage) ,
619
- hash,
620
- } ) ) )
596
+ Ok ( Self ( Arc :: new ( watch:: Sender :: new ( storage) ) ) )
621
597
}
622
598
623
599
/// Create a new complete bao file handle.
624
600
pub fn new_complete (
625
- hash : Hash ,
626
601
data : MemOrFile < Bytes , FixedSize < File > > ,
627
602
outboard : MemOrFile < Bytes , File > ,
628
603
) -> Self {
629
604
let storage = CompleteStorage { data, outboard } . into ( ) ;
630
- Self ( Arc :: new ( BaoFileHandleInner {
631
- storage : watch:: Sender :: new ( storage) ,
632
- hash,
633
- } ) )
605
+ Self ( Arc :: new ( watch:: Sender :: new ( storage) ) )
634
606
}
635
607
636
608
/// Complete the handle
@@ -639,7 +611,7 @@ impl BaoFileHandle {
639
611
data : MemOrFile < Bytes , FixedSize < File > > ,
640
612
outboard : MemOrFile < Bytes , File > ,
641
613
) {
642
- self . storage . send_if_modified ( |guard| {
614
+ self . send_if_modified ( |guard| {
643
615
let res = match guard {
644
616
BaoFileStorage :: Complete ( _) => None ,
645
617
BaoFileStorage :: PartialMem ( entry) => Some ( & mut entry. bitfield ) ,
@@ -657,13 +629,13 @@ impl BaoFileHandle {
657
629
}
658
630
659
631
pub fn subscribe ( & self ) -> BaoFileStorageSubscriber {
660
- BaoFileStorageSubscriber :: new ( self . 0 . storage . subscribe ( ) )
632
+ BaoFileStorageSubscriber :: new ( self . 0 . subscribe ( ) )
661
633
}
662
634
663
635
/// True if the file is complete.
664
636
#[ allow( dead_code) ]
665
637
pub fn is_complete ( & self ) -> bool {
666
- matches ! ( self . storage . borrow( ) . deref( ) , BaoFileStorage :: Complete ( _) )
638
+ matches ! ( self . borrow( ) . deref( ) , BaoFileStorage :: Complete ( _) )
667
639
}
668
640
669
641
/// An AsyncSliceReader for the data file.
@@ -684,7 +656,7 @@ impl BaoFileHandle {
684
656
685
657
/// The most precise known total size of the data file.
686
658
pub fn current_size ( & self ) -> io:: Result < u64 > {
687
- match self . storage . borrow ( ) . deref ( ) {
659
+ match self . borrow ( ) . deref ( ) {
688
660
BaoFileStorage :: Complete ( mem) => Ok ( mem. size ( ) ) ,
689
661
BaoFileStorage :: PartialMem ( mem) => Ok ( mem. current_size ( ) ) ,
690
662
BaoFileStorage :: Partial ( file) => file. current_size ( ) ,
@@ -694,7 +666,7 @@ impl BaoFileHandle {
694
666
695
667
/// The most precise known total size of the data file.
696
668
pub fn bitfield ( & self ) -> io:: Result < Bitfield > {
697
- match self . storage . borrow ( ) . deref ( ) {
669
+ match self . borrow ( ) . deref ( ) {
698
670
BaoFileStorage :: Complete ( mem) => Ok ( mem. bitfield ( ) ) ,
699
671
BaoFileStorage :: PartialMem ( mem) => Ok ( mem. bitfield ( ) . clone ( ) ) ,
700
672
BaoFileStorage :: Partial ( file) => Ok ( file. bitfield ( ) . clone ( ) ) ,
@@ -703,33 +675,27 @@ impl BaoFileHandle {
703
675
}
704
676
705
677
/// The outboard for the file.
706
- pub fn outboard ( & self ) -> io:: Result < PreOrderOutboard < OutboardReader > > {
707
- let root = self . hash . into ( ) ;
678
+ pub fn outboard ( & self , hash : & Hash ) -> io:: Result < PreOrderOutboard < OutboardReader > > {
708
679
let tree = BaoTree :: new ( self . current_size ( ) ?, IROH_BLOCK_SIZE ) ;
709
680
let outboard = self . outboard_reader ( ) ;
710
681
Ok ( PreOrderOutboard {
711
- root,
682
+ root : blake3 :: Hash :: from ( * hash ) ,
712
683
tree,
713
684
data : outboard,
714
685
} )
715
686
}
716
687
717
- /// The hash of the file.
718
- pub fn hash ( & self ) -> Hash {
719
- self . hash
720
- }
721
-
722
688
/// Write a batch and notify the db
723
689
pub ( super ) async fn write_batch (
724
690
& self ,
725
691
batch : & [ BaoContentItem ] ,
726
692
bitfield : & Bitfield ,
727
- ctx : & TaskContext ,
693
+ ctx : & HashContext ,
728
694
) -> io:: Result < ( ) > {
729
695
trace ! ( "write_batch bitfield={:?} batch={}" , bitfield, batch. len( ) ) ;
730
696
let mut res = Ok ( None ) ;
731
- self . storage . send_if_modified ( |state| {
732
- let Ok ( ( state1, update) ) = state. take ( ) . write_batch ( batch, bitfield, ctx, & self . hash )
697
+ self . send_if_modified ( |state| {
698
+ let Ok ( ( state1, update) ) = state. take ( ) . write_batch ( batch, bitfield, ctx)
733
699
else {
734
700
res = Err ( io:: Error :: other ( "write batch failed" ) ) ;
735
701
return false ;
@@ -739,7 +705,7 @@ impl BaoFileHandle {
739
705
true
740
706
} ) ;
741
707
if let Some ( update) = res? {
742
- ctx. db . update ( self . hash , update) . await ?;
708
+ ctx. global . db . update ( ctx . id , update) . await ?;
743
709
}
744
710
Ok ( ( ) )
745
711
}
0 commit comments