Skip to content
81 changes: 75 additions & 6 deletions beacon_node/beacon_chain/src/custody_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ impl ValidatorRegistrations {
let effective_epoch =
(current_slot + effective_delay_slots).epoch(E::slots_per_epoch()) + 1;
self.epoch_validator_custody_requirements
.entry(effective_epoch)
.and_modify(|old_custody| *old_custody = validator_custody_requirement)
.or_insert(validator_custody_requirement);
.insert(effective_epoch, validator_custody_requirement);
Some((effective_epoch, validator_custody_requirement))
} else {
None
Expand Down Expand Up @@ -154,11 +152,25 @@ impl ValidatorRegistrations {
});

self.epoch_validator_custody_requirements
.entry(effective_epoch)
.and_modify(|old_custody| *old_custody = latest_validator_custody)
.or_insert(latest_validator_custody);
.insert(effective_epoch, latest_validator_custody);
}
}

/// Updates the `epoch -> cgc` map by pruning records before `effective_epoch`
/// while setting the `cgc` at `effective_epoch` to the latest validator custody requirement.
///
/// This is used to restart custody backfill sync at `effective_epoch`
pub fn reset_validator_custody_requirements(&mut self, effective_epoch: Epoch) {
if let Some(latest_validator_custody_requirements) =
self.latest_validator_custody_requirement()
{
self.epoch_validator_custody_requirements
.retain(|&epoch, _| epoch >= effective_epoch);

self.epoch_validator_custody_requirements
.insert(effective_epoch, latest_validator_custody_requirements);
};
}
}

/// Given the `validator_custody_units`, return the custody requirement based on
Expand Down Expand Up @@ -535,6 +547,14 @@ impl<E: EthSpec> CustodyContext<E> {
.write()
.backfill_validator_custody_requirements(effective_epoch, expected_cgc);
}

/// The node is attempting to restart custody backfill. Update the internal records so that
/// custody backfill can start backfilling at `effective_epoch`.
pub fn reset_validator_custody_requirements(&self, effective_epoch: Epoch) {
self.validator_registrations
.write()
.reset_validator_custody_requirements(effective_epoch);
}
}

/// Indicates that the custody group count (CGC) has increased.
Expand Down Expand Up @@ -1491,4 +1511,53 @@ mod tests {
);
}
}

#[test]
fn reset_validator_custody_requirements() {
let spec = E::default_spec();
let minimum_cgc = 4u64;
let initial_cgc = 8u64;
let mid_cgc = 16u64;
let final_cgc = 32u64;

// Setup: Node restart after multiple validator registrations causing CGC increases
let head_epoch = Epoch::new(20);
let epoch_and_cgc_tuples = vec![
(Epoch::new(0), initial_cgc),
(Epoch::new(10), mid_cgc),
(head_epoch, final_cgc),
];
let custody_context = setup_custody_context(&spec, head_epoch, epoch_and_cgc_tuples);

// Backfill from epoch 20 to 9
complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(9), final_cgc);

// Reset validator custody requirements to the latest cgc requirements at `head_epoch` up to the boundary epoch
custody_context.reset_validator_custody_requirements(head_epoch);

// Verify epochs 0 - 19 return the minimum cgc requirement because of the validator custody requirement reset
for epoch in 0..=19 {
assert_eq!(
custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec),
minimum_cgc,
);
}

// Verify epoch 20 returns a CGC of 32
assert_eq!(
custody_context.custody_group_count_at_epoch(head_epoch, &spec),
final_cgc
);

// Rerun Backfill to epoch 20
complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(0), final_cgc);

// Verify epochs 0 - 20 return the final cgc requirements
for epoch in 0..=20 {
assert_eq!(
custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec),
final_cgc,
);
}
}
}
5 changes: 0 additions & 5 deletions beacon_node/beacon_chain/src/historical_data_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_data_column(&block_root, &data_column.index)?
.is_some()
{
debug!(
block_root = ?block_root,
column_index = data_column.index,
"Skipping data column import as identical data column exists"
);
continue;
}
if block_root != data_column.block_root() {
Expand Down
32 changes: 32 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4604,6 +4604,37 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// POST lighthouse/custody/backfill
let post_lighthouse_custody_backfill = warp::path("lighthouse")
.and(warp::path("custody"))
.and(warp::path("backfill"))
.and(warp::path::end())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
// Calling this endpoint will trigger custody backfill once `effective_epoch``
// is finalized.
let effective_epoch = chain
.canonical_head
.cached_head()
.head_slot()
.epoch(T::EthSpec::slots_per_epoch())
+ 1;
let custody_context = chain.data_availability_checker.custody_context();
// Reset validator custody requirements to `effective_epoch` with the latest
// cgc requiremnets.
custody_context.reset_validator_custody_requirements(effective_epoch);
// Update `DataColumnCustodyInfo` to reflect the custody change.
chain.update_data_column_custody_info(Some(
effective_epoch.start_slot(T::EthSpec::slots_per_epoch()),
));
Ok(())
})
},
);

// GET lighthouse/analysis/block_rewards
let get_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis"))
Expand Down Expand Up @@ -4963,6 +4994,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_lighthouse_compaction)
.uor(post_lighthouse_add_peer)
.uor(post_lighthouse_remove_peer)
.uor(post_lighthouse_custody_backfill)
.recover(warp_utils::reject::handle_rejection),
),
)
Expand Down
18 changes: 18 additions & 0 deletions beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{Config, Context};
use beacon_chain::{
BeaconChain, BeaconChainTypes,
custody_context::NodeCustodyType,
test_utils::{BeaconChainHarness, BoxedMutator, Builder, EphemeralHarnessType},
};
use beacon_processor::{
Expand Down Expand Up @@ -67,6 +68,20 @@ impl<E: EthSpec> InteractiveTester<E> {
None,
Config::default(),
true,
NodeCustodyType::Fullnode,
)
.await
}

pub async fn new_supernode(spec: Option<ChainSpec>, validator_count: usize) -> Self {
Self::new_with_initializer_and_mutator(
spec,
validator_count,
None,
None,
Config::default(),
true,
NodeCustodyType::Supernode,
)
.await
}
Expand All @@ -78,6 +93,7 @@ impl<E: EthSpec> InteractiveTester<E> {
mutator: Option<Mutator<E>>,
config: Config,
use_mock_builder: bool,
node_custody_type: NodeCustodyType,
) -> Self {
let mut harness_builder = BeaconChainHarness::builder(E::default())
.spec_or_default(spec.map(Arc::new))
Expand All @@ -93,6 +109,8 @@ impl<E: EthSpec> InteractiveTester<E> {
.fresh_ephemeral_store()
};

harness_builder = harness_builder.node_custody_type(node_custody_type);

// Add a mutator for the beacon chain builder which will be called in
// `HarnessBuilder::build`.
if let Some(mutator) = mutator {
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/http_api/tests/broadcast_validation_tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::test_utils::test_spec;
use beacon_chain::{
GossipVerifiedBlock, IntoGossipVerifiedBlock, WhenSlotSkipped,
Expand Down Expand Up @@ -1956,6 +1957,7 @@ pub async fn duplicate_block_status_code() {
..Config::default()
},
true,
NodeCustodyType::Fullnode,
)
.await;

Expand Down
2 changes: 2 additions & 0 deletions beacon_node/http_api/tests/fork_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Tests for API behaviour across fork boundaries.
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::{
StateSkipConfig,
test_utils::{DEFAULT_ETH1_BLOCK_HASH, HARNESS_GENESIS_TIME, RelativeSyncCommittee},
Expand Down Expand Up @@ -426,6 +427,7 @@ async fn bls_to_execution_changes_update_all_around_capella_fork() {
None,
Default::default(),
true,
NodeCustodyType::Fullnode,
)
.await;
let harness = &tester.harness;
Expand Down
72 changes: 72 additions & 0 deletions beacon_node/http_api/tests/interactive_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Generic tests that make use of the (newer) `InteractiveApiTester`
use beacon_chain::custody_context::NodeCustodyType;
use beacon_chain::{
ChainConfig,
chain_config::{DisallowedReOrgOffsets, ReOrgThreshold},
Expand Down Expand Up @@ -76,6 +77,7 @@ async fn state_by_root_pruned_from_fork_choice() {
None,
Default::default(),
false,
NodeCustodyType::Fullnode,
)
.await;

Expand Down Expand Up @@ -433,6 +435,7 @@ pub async fn proposer_boost_re_org_test(
})),
Default::default(),
false,
NodeCustodyType::Fullnode,
)
.await;
let harness = &tester.harness;
Expand Down Expand Up @@ -1050,6 +1053,68 @@ async fn proposer_duties_with_gossip_tolerance() {
);
}

// Test that a request to `lighthouse/custody/backfill` succeeds by verifying that `CustodyContext` and `DataColumnCustodyInfo`
// have been updated with the correct values.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn lighthouse_restart_custody_backfill() {
let spec = test_spec::<E>();

// Skip pre-Fulu.
if !spec.is_fulu_scheduled() {
return;
}

let validator_count = 24;

let tester = InteractiveTester::<E>::new_supernode(Some(spec), validator_count).await;
let harness = &tester.harness;
let spec = &harness.spec;
let client = &tester.client;
let min_cgc = spec.custody_requirement;
let max_cgc = spec.number_of_custody_groups;

let num_blocks = 2 * E::slots_per_epoch();

let custody_context = harness.chain.data_availability_checker.custody_context();

harness.advance_slot();
harness
.extend_chain_with_sync(
num_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
SyncCommitteeStrategy::NoValidators,
LightClientStrategy::Disabled,
)
.await;

let cgc_at_head = custody_context.custody_group_count_at_head(spec);
let earliest_data_column_epoch = harness.chain.earliest_custodied_data_column_epoch();

assert_eq!(cgc_at_head, max_cgc);
assert_eq!(earliest_data_column_epoch, None);

custody_context
.update_and_backfill_custody_count_at_epoch(harness.chain.epoch().unwrap(), cgc_at_head);
client.post_lighthouse_custody_backfill().await.unwrap();

let cgc_at_head = custody_context.custody_group_count_at_head(spec);
let cgc_at_previous_epoch =
custody_context.custody_group_count_at_epoch(harness.chain.epoch().unwrap() - 1, spec);
let earliest_data_column_epoch = harness.chain.earliest_custodied_data_column_epoch();

// `DataColumnCustodyInfo` should have been updated to the head epoch
assert_eq!(
earliest_data_column_epoch,
Some(harness.chain.epoch().unwrap())
);
// Cgc requirements should have stayed the same at head
assert_eq!(cgc_at_head, max_cgc);
// Cgc requirements at the previous epoch should be `min_cgc`
// This allows for custody backfill to re-fetch columns for this epoch.
assert_eq!(cgc_at_previous_epoch, min_cgc);
}

// Test that a request for next epoch proposer duties suceeds when the current slot clock is within
// gossip clock disparity (500ms) of the new epoch.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand All @@ -1066,6 +1131,13 @@ async fn lighthouse_custody_info() {
spec.min_epochs_for_blob_sidecars_requests = 2;
spec.min_epochs_for_data_column_sidecars_requests = 2;

spec.altair_fork_epoch = Some(Epoch::new(0));
spec.bellatrix_fork_epoch = Some(Epoch::new(0));
spec.capella_fork_epoch = Some(Epoch::new(0));
spec.deneb_fork_epoch = Some(Epoch::new(0));
spec.electra_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));

let validator_count = 24;

let tester = InteractiveTester::<E>::new(Some(spec), validator_count).await;
Expand Down
5 changes: 2 additions & 3 deletions beacon_node/network/src/sync/custody_backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,9 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
return None;
};

let mut missing_columns = HashSet::new();

// Skip all batches (Epochs) that don't have missing columns.
for epoch in Epoch::range_inclusive_rev(self.to_be_downloaded, column_da_boundary) {
missing_columns = self.beacon_chain.get_missing_columns_for_epoch(epoch);
let missing_columns = self.beacon_chain.get_missing_columns_for_epoch(epoch);

if !missing_columns.is_empty() {
self.to_be_downloaded = epoch;
Expand Down Expand Up @@ -445,6 +443,7 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
self.include_next_batch()
}
Entry::Vacant(entry) => {
let missing_columns = self.beacon_chain.get_missing_columns_for_epoch(batch_id);
entry.insert(BatchInfo::new(
&batch_id,
CUSTODY_BACKFILL_EPOCHS_PER_BATCH,
Expand Down
21 changes: 11 additions & 10 deletions beacon_node/network/src/sync/range_data_column_batch_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,17 @@ impl<T: BeaconChainTypes> RangeDataColumnBatchRequest<T> {
HashMap::new();
let mut column_to_peer_id: HashMap<u64, PeerId> = HashMap::new();

for column in self
.requests
.values()
.filter_map(|req| req.to_finished())
.flatten()
{
received_columns_for_slot
.entry(column.slot())
.or_default()
.push(column.clone());
for req in self.requests.values() {
let Some(columns) = req.to_finished() else {
return None;
};

for column in columns {
received_columns_for_slot
.entry(column.slot())
.or_default()
.push(column.clone());
}
}

// Note: this assumes that only 1 peer is responsible for a column
Expand Down
Loading
Loading