@@ -21,6 +21,7 @@ use crate::block_verification_types::{
2121} ;
2222pub use crate :: canonical_head:: CanonicalHead ;
2323use crate :: chain_config:: ChainConfig ;
24+ use crate :: custody_context:: CustodyContextSsz ;
2425use crate :: data_availability_checker:: {
2526 Availability , AvailabilityCheckError , AvailableBlock , AvailableBlockData ,
2627 DataAvailabilityChecker , DataColumnReconstructionResult ,
@@ -64,7 +65,6 @@ use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
6465use crate :: sync_committee_verification:: {
6566 Error as SyncCommitteeError , VerifiedSyncCommitteeMessage , VerifiedSyncContribution ,
6667} ;
67- use crate :: validator_custody:: CustodyContextSsz ;
6868use crate :: validator_monitor:: {
6969 HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS , ValidatorMonitor , get_slot_delay_ms,
7070 timestamp_now,
@@ -3564,7 +3564,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
35643564 . await
35653565 }
35663566
3567- fn check_blobs_for_slashability < ' a > (
3567+ fn check_blob_header_signature_and_slashability < ' a > (
35683568 self : & Arc < Self > ,
35693569 block_root : Hash256 ,
35703570 blobs : impl IntoIterator < Item = & ' a BlobSidecar < T :: EthSpec > > ,
@@ -3575,17 +3575,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
35753575 . map ( |b| b. signed_block_header . clone ( ) )
35763576 . unique ( )
35773577 {
3578- if verify_header_signature :: < T , BlockError > ( self , & header) . is_ok ( ) {
3579- slashable_cache
3580- . observe_slashable (
3581- header. message . slot ,
3582- header. message . proposer_index ,
3583- block_root,
3584- )
3585- . map_err ( |e| BlockError :: BeaconChainError ( Box :: new ( e. into ( ) ) ) ) ?;
3586- if let Some ( slasher) = self . slasher . as_ref ( ) {
3587- slasher. accept_block_header ( header) ;
3588- }
3578+ // Return an error if *any* header signature is invalid, we do not want to import this
3579+ // list of blobs into the DA checker. However, we will process any valid headers prior
3580+ // to the first invalid header in the slashable cache & slasher.
3581+ verify_header_signature :: < T , BlockError > ( self , & header) ?;
3582+
3583+ slashable_cache
3584+ . observe_slashable (
3585+ header. message . slot ,
3586+ header. message . proposer_index ,
3587+ block_root,
3588+ )
3589+ . map_err ( |e| BlockError :: BeaconChainError ( Box :: new ( e. into ( ) ) ) ) ?;
3590+ if let Some ( slasher) = self . slasher . as_ref ( ) {
3591+ slasher. accept_block_header ( header) ;
35893592 }
35903593 }
35913594 Ok ( ( ) )
@@ -3599,7 +3602,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
35993602 block_root : Hash256 ,
36003603 blobs : FixedBlobSidecarList < T :: EthSpec > ,
36013604 ) -> Result < AvailabilityProcessingStatus , BlockError > {
3602- self . check_blobs_for_slashability ( block_root, blobs. iter ( ) . flatten ( ) . map ( Arc :: as_ref) ) ?;
3605+ self . check_blob_header_signature_and_slashability (
3606+ block_root,
3607+ blobs. iter ( ) . flatten ( ) . map ( Arc :: as_ref) ,
3608+ ) ?;
36033609 let availability = self
36043610 . data_availability_checker
36053611 . put_rpc_blobs ( block_root, blobs) ?;
@@ -3616,12 +3622,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
36163622 ) -> Result < AvailabilityProcessingStatus , BlockError > {
36173623 let availability = match engine_get_blobs_output {
36183624 EngineGetBlobsOutput :: Blobs ( blobs) => {
3619- self . check_blobs_for_slashability ( block_root, blobs. iter ( ) . map ( |b| b. as_blob ( ) ) ) ?;
3625+ self . check_blob_header_signature_and_slashability (
3626+ block_root,
3627+ blobs. iter ( ) . map ( |b| b. as_blob ( ) ) ,
3628+ ) ?;
36203629 self . data_availability_checker
36213630 . put_kzg_verified_blobs ( block_root, blobs) ?
36223631 }
36233632 EngineGetBlobsOutput :: CustodyColumns ( data_columns) => {
3624- self . check_columns_for_slashability (
3633+ self . check_data_column_sidecar_header_signature_and_slashability (
36253634 block_root,
36263635 data_columns. iter ( ) . map ( |c| c. as_data_column ( ) ) ,
36273636 ) ?;
@@ -3642,7 +3651,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
36423651 block_root : Hash256 ,
36433652 custody_columns : DataColumnSidecarList < T :: EthSpec > ,
36443653 ) -> Result < AvailabilityProcessingStatus , BlockError > {
3645- self . check_columns_for_slashability (
3654+ self . check_data_column_sidecar_header_signature_and_slashability (
36463655 block_root,
36473656 custody_columns. iter ( ) . map ( |c| c. as_ref ( ) ) ,
36483657 ) ?;
@@ -3659,7 +3668,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
36593668 . await
36603669 }
36613670
3662- fn check_columns_for_slashability < ' a > (
3671+ fn check_data_column_sidecar_header_signature_and_slashability < ' a > (
36633672 self : & Arc < Self > ,
36643673 block_root : Hash256 ,
36653674 custody_columns : impl IntoIterator < Item = & ' a DataColumnSidecar < T :: EthSpec > > ,
@@ -3673,17 +3682,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
36733682 . map ( |c| c. signed_block_header . clone ( ) )
36743683 . unique ( )
36753684 {
3676- if verify_header_signature :: < T , BlockError > ( self , & header) . is_ok ( ) {
3677- slashable_cache
3678- . observe_slashable (
3679- header. message . slot ,
3680- header. message . proposer_index ,
3681- block_root,
3682- )
3683- . map_err ( |e| BlockError :: BeaconChainError ( Box :: new ( e. into ( ) ) ) ) ?;
3684- if let Some ( slasher) = self . slasher . as_ref ( ) {
3685- slasher. accept_block_header ( header) ;
3686- }
3685+ // Return an error if *any* header signature is invalid, we do not want to import this
3686+ // list of blobs into the DA checker. However, we will process any valid headers prior
3687+ // to the first invalid header in the slashable cache & slasher.
3688+ verify_header_signature :: < T , BlockError > ( self , & header) ?;
3689+
3690+ slashable_cache
3691+ . observe_slashable (
3692+ header. message . slot ,
3693+ header. message . proposer_index ,
3694+ block_root,
3695+ )
3696+ . map_err ( |e| BlockError :: BeaconChainError ( Box :: new ( e. into ( ) ) ) ) ?;
3697+ if let Some ( slasher) = self . slasher . as_ref ( ) {
3698+ slasher. accept_block_header ( header) ;
36873699 }
36883700 }
36893701 Ok ( ( ) )
@@ -6934,9 +6946,138 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
69346946 pub fn update_data_column_custody_info ( & self , slot : Option < Slot > ) {
69356947 self . store
69366948 . put_data_column_custody_info ( slot)
6937- . unwrap_or_else (
6938- |e| tracing:: error!( error = ?e, "Failed to update data column custody info" ) ,
6949+ . unwrap_or_else ( |e| error ! ( error = ?e, "Failed to update data column custody info" ) ) ;
6950+ }
6951+
6952+ /// Get the earliest epoch in which the node has met its custody requirements.
6953+ /// A `None` response indicates that we've met our custody requirements up to the
6954+ /// column data availability window
6955+ pub fn earliest_custodied_data_column_epoch ( & self ) -> Option < Epoch > {
6956+ self . store
6957+ . get_data_column_custody_info ( )
6958+ . inspect_err (
6959+ |e| error ! ( error=?e, "Failed to get data column custody info from the store" ) ,
6960+ )
6961+ . ok ( )
6962+ . flatten ( )
6963+ . and_then ( |info| info. earliest_data_column_slot )
6964+ . map ( |slot| {
6965+ let mut epoch = slot. epoch ( T :: EthSpec :: slots_per_epoch ( ) ) ;
6966+ // If the earliest custodied slot isn't the first slot in the epoch
6967+ // The node has only met its custody requirements for the next epoch.
6968+ if slot > epoch. start_slot ( T :: EthSpec :: slots_per_epoch ( ) ) {
6969+ epoch += 1 ;
6970+ }
6971+ epoch
6972+ } )
6973+ }
6974+
6975+ /// The data availability boundary for custodying columns. It will just be the
6976+ /// regular data availability boundary unless we are near the Fulu fork epoch.
6977+ pub fn column_data_availability_boundary ( & self ) -> Option < Epoch > {
6978+ match self . data_availability_boundary ( ) {
6979+ Some ( da_boundary_epoch) => {
6980+ if let Some ( fulu_fork_epoch) = self . spec . fulu_fork_epoch {
6981+ if da_boundary_epoch < fulu_fork_epoch {
6982+ Some ( fulu_fork_epoch)
6983+ } else {
6984+ Some ( da_boundary_epoch)
6985+ }
6986+ } else {
6987+ None // Fulu hasn't been enabled
6988+ }
6989+ }
6990+ None => None , // Deneb hasn't been enabled
6991+ }
6992+ }
6993+
6994+ /// Safely update data column custody info by ensuring that:
6995+ /// - cgc values at the updated epoch and the earliest custodied column epoch are equal
6996+ /// - we are only decrementing the earliest custodied data column epoch by one epoch
6997+ /// - the new earliest data column slot is set to the first slot in `effective_epoch`.
6998+ pub fn safely_backfill_data_column_custody_info (
6999+ & self ,
7000+ effective_epoch : Epoch ,
7001+ ) -> Result < ( ) , Error > {
7002+ let Some ( earliest_data_column_epoch) = self . earliest_custodied_data_column_epoch ( ) else {
7003+ return Ok ( ( ) ) ;
7004+ } ;
7005+
7006+ if effective_epoch >= earliest_data_column_epoch {
7007+ return Ok ( ( ) ) ;
7008+ }
7009+
7010+ let cgc_at_effective_epoch = self
7011+ . data_availability_checker
7012+ . custody_context ( )
7013+ . custody_group_count_at_epoch ( effective_epoch, & self . spec ) ;
7014+
7015+ let cgc_at_earliest_data_colum_epoch = self
7016+ . data_availability_checker
7017+ . custody_context ( )
7018+ . custody_group_count_at_epoch ( earliest_data_column_epoch, & self . spec ) ;
7019+
7020+ let can_update_data_column_custody_info = cgc_at_effective_epoch
7021+ == cgc_at_earliest_data_colum_epoch
7022+ && effective_epoch == earliest_data_column_epoch - 1 ;
7023+
7024+ if can_update_data_column_custody_info {
7025+ self . store . put_data_column_custody_info ( Some (
7026+ effective_epoch. start_slot ( T :: EthSpec :: slots_per_epoch ( ) ) ,
7027+ ) ) ?;
7028+ } else {
7029+ error ! (
7030+ ?cgc_at_effective_epoch,
7031+ ?cgc_at_earliest_data_colum_epoch,
7032+ ?effective_epoch,
7033+ ?earliest_data_column_epoch,
7034+ "Couldn't update data column custody info"
69397035 ) ;
7036+ return Err ( Error :: FailedColumnCustodyInfoUpdate ) ;
7037+ }
7038+
7039+ Ok ( ( ) )
7040+ }
7041+
7042+ /// Compare columns custodied for `epoch` versus columns custodied for the head of the chain
7043+ /// and return any column indices that are missing.
7044+ pub fn get_missing_columns_for_epoch ( & self , epoch : Epoch ) -> HashSet < ColumnIndex > {
7045+ let custody_context = self . data_availability_checker . custody_context ( ) ;
7046+
7047+ let columns_required = custody_context
7048+ . custody_columns_for_epoch ( None , & self . spec )
7049+ . iter ( )
7050+ . cloned ( )
7051+ . collect :: < HashSet < _ > > ( ) ;
7052+
7053+ let current_columns_at_epoch = custody_context
7054+ . custody_columns_for_epoch ( Some ( epoch) , & self . spec )
7055+ . iter ( )
7056+ . cloned ( )
7057+ . collect :: < HashSet < _ > > ( ) ;
7058+
7059+ columns_required
7060+ . difference ( & current_columns_at_epoch)
7061+ . cloned ( )
7062+ . collect :: < HashSet < _ > > ( )
7063+ }
7064+
7065+ /// The da boundary for custodying columns. It will just be the DA boundary unless we are near the Fulu fork epoch.
7066+ pub fn get_column_da_boundary ( & self ) -> Option < Epoch > {
7067+ match self . data_availability_boundary ( ) {
7068+ Some ( da_boundary_epoch) => {
7069+ if let Some ( fulu_fork_epoch) = self . spec . fulu_fork_epoch {
7070+ if da_boundary_epoch < fulu_fork_epoch {
7071+ Some ( fulu_fork_epoch)
7072+ } else {
7073+ Some ( da_boundary_epoch)
7074+ }
7075+ } else {
7076+ None
7077+ }
7078+ }
7079+ None => None , // If no DA boundary set, dont try to custody backfill
7080+ }
69407081 }
69417082
69427083 /// This method serves to get a sense of the current chain health. It is used in block proposal
0 commit comments