Skip to content

Commit 8057ac2

Browse files
authored
Merge of #8267
2 parents 55588f7 + 7e79085 commit 8057ac2

File tree

6 files changed

+103
-91
lines changed

6 files changed

+103
-91
lines changed

beacon_node/beacon_chain/src/custody_context.rs

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,17 @@ impl ValidatorRegistrations {
134134
///
135135
/// This is done by pruning all values on/after `effective_epoch` and updating the map to store
136136
/// the latest validator custody requirements for the `effective_epoch`.
137-
pub fn backfill_validator_custody_requirements(&mut self, effective_epoch: Epoch) {
137+
pub fn backfill_validator_custody_requirements(
138+
&mut self,
139+
effective_epoch: Epoch,
140+
expected_cgc: u64,
141+
) {
138142
if let Some(latest_validator_custody) = self.latest_validator_custody_requirement() {
143+
// If the expected cgc isn't equal to the latest validator custody a very recent cgc change may have occurred.
144+
// We should not update the mapping.
145+
if expected_cgc != latest_validator_custody {
146+
return;
147+
}
139148
// Delete records if
140149
// 1. The epoch is greater than or equal than `effective_epoch`
141150
// 2. the cgc requirements match the latest validator custody requirements
@@ -517,10 +526,14 @@ impl<E: EthSpec> CustodyContext<E> {
517526

518527
/// The node has completed backfill for this epoch. Update the internal records so the function
519528
/// [`Self::custody_columns_for_epoch()`] returns up-to-date results.
520-
pub fn update_and_backfill_custody_count_at_epoch(&self, effective_epoch: Epoch) {
529+
pub fn update_and_backfill_custody_count_at_epoch(
530+
&self,
531+
effective_epoch: Epoch,
532+
expected_cgc: u64,
533+
) {
521534
self.validator_registrations
522535
.write()
523-
.backfill_validator_custody_requirements(effective_epoch);
536+
.backfill_validator_custody_requirements(effective_epoch, expected_cgc);
524537
}
525538
}
526539

@@ -604,11 +617,13 @@ mod tests {
604617
custody_context: &CustodyContext<E>,
605618
start_epoch: Epoch,
606619
end_epoch: Epoch,
620+
expected_cgc: u64,
607621
) {
608622
assert!(start_epoch >= end_epoch);
609623
// Call from end_epoch down to start_epoch (inclusive), simulating backfill
610624
for epoch in (end_epoch.as_u64()..=start_epoch.as_u64()).rev() {
611-
custody_context.update_and_backfill_custody_count_at_epoch(Epoch::new(epoch));
625+
custody_context
626+
.update_and_backfill_custody_count_at_epoch(Epoch::new(epoch), expected_cgc);
612627
}
613628
}
614629

@@ -1368,7 +1383,7 @@ mod tests {
13681383
);
13691384

13701385
// Backfill from epoch 20 down to 15 (simulating backfill)
1371-
complete_backfill_for_epochs(&custody_context, head_epoch, Epoch::new(15));
1386+
complete_backfill_for_epochs(&custody_context, head_epoch, Epoch::new(15), final_cgc);
13721387

13731388
// After backfilling to epoch 15, it should use latest CGC (32)
13741389
assert_eq!(
@@ -1406,7 +1421,43 @@ mod tests {
14061421
let custody_context = setup_custody_context(&spec, head_epoch, epoch_and_cgc_tuples);
14071422

14081423
// Backfill to epoch 15 (between the two CGC increases)
1409-
complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(15));
1424+
complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(15), final_cgc);
1425+
1426+
// Verify epochs 15 - 20 return latest CGC (32)
1427+
for epoch in 15..=20 {
1428+
assert_eq!(
1429+
custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec),
1430+
final_cgc,
1431+
);
1432+
}
1433+
1434+
// Verify epochs 10-14 still return mid_cgc (16)
1435+
for epoch in 10..14 {
1436+
assert_eq!(
1437+
custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec),
1438+
mid_cgc,
1439+
);
1440+
}
1441+
}
1442+
1443+
#[test]
1444+
fn attempt_backfill_with_invalid_cgc() {
1445+
let spec = E::default_spec();
1446+
let initial_cgc = 8u64;
1447+
let mid_cgc = 16u64;
1448+
let final_cgc = 32u64;
1449+
1450+
// Setup: Node restart after multiple validator registrations causing CGC increases
1451+
let head_epoch = Epoch::new(20);
1452+
let epoch_and_cgc_tuples = vec![
1453+
(Epoch::new(0), initial_cgc),
1454+
(Epoch::new(10), mid_cgc),
1455+
(head_epoch, final_cgc),
1456+
];
1457+
let custody_context = setup_custody_context(&spec, head_epoch, epoch_and_cgc_tuples);
1458+
1459+
// Backfill to epoch 15 (between the two CGC increases)
1460+
complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(15), final_cgc);
14101461

14111462
// Verify epochs 15 - 20 return latest CGC (32)
14121463
for epoch in 15..=20 {
@@ -1416,6 +1467,22 @@ mod tests {
14161467
);
14171468
}
14181469

1470+
// Attempt backfill with an incorrect cgc value
1471+
complete_backfill_for_epochs(
1472+
&custody_context,
1473+
Epoch::new(20),
1474+
Epoch::new(15),
1475+
initial_cgc,
1476+
);
1477+
1478+
// Verify epochs 15 - 20 still return latest CGC (32)
1479+
for epoch in 15..=20 {
1480+
assert_eq!(
1481+
custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec),
1482+
final_cgc,
1483+
);
1484+
}
1485+
14191486
// Verify epochs 10-14 still return mid_cgc (16)
14201487
for epoch in 10..14 {
14211488
assert_eq!(

beacon_node/beacon_chain/src/historical_data_columns.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
5454
&self,
5555
epoch: Epoch,
5656
historical_data_column_sidecar_list: DataColumnSidecarList<T::EthSpec>,
57+
expected_cgc: u64,
5758
) -> Result<usize, HistoricalDataColumnError> {
5859
let mut total_imported = 0;
5960
let mut ops = vec![];
@@ -136,7 +137,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
136137

137138
self.data_availability_checker
138139
.custody_context()
139-
.update_and_backfill_custody_count_at_epoch(epoch);
140+
.update_and_backfill_custody_count_at_epoch(epoch, expected_cgc);
140141

141142
self.safely_backfill_data_column_custody_info(epoch)
142143
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;

beacon_node/beacon_chain/tests/store_tests.rs

Lines changed: 18 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -3182,13 +3182,16 @@ async fn weak_subjectivity_sync_test(
31823182
assert_eq!(store.get_anchor_info().state_upper_limit, Slot::new(0));
31833183
}
31843184

3185+
// This test prunes data columns from epoch 0 and then tries to re-import them via
3186+
// the same code paths that custody backfill sync imports data columns
31853187
#[tokio::test]
31863188
async fn test_import_historical_data_columns_batch() {
31873189
let spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
31883190
let db_path = tempdir().unwrap();
31893191
let store = get_store_generic(&db_path, StoreConfig::default(), spec);
31903192
let start_slot = Epoch::new(0).start_slot(E::slots_per_epoch()) + 1;
31913193
let end_slot = Epoch::new(0).end_slot(E::slots_per_epoch());
3194+
let cgc = 128;
31923195

31933196
let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT);
31943197

@@ -3208,6 +3211,7 @@ async fn test_import_historical_data_columns_batch() {
32083211

32093212
let mut data_columns_list = vec![];
32103213

3214+
// Get all data columns for epoch 0
32113215
for block in block_root_iter {
32123216
let (block_root, _) = block.unwrap();
32133217
let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap();
@@ -3227,6 +3231,7 @@ async fn test_import_historical_data_columns_batch() {
32273231

32283232
harness.advance_slot();
32293233

3234+
// Prune data columns
32303235
harness
32313236
.chain
32323237
.store
@@ -3238,21 +3243,25 @@ async fn test_import_historical_data_columns_batch() {
32383243
.forwards_iter_block_roots_until(start_slot, end_slot)
32393244
.unwrap();
32403245

3246+
// Assert that data columns no longer exist for epoch 0
32413247
for block in block_root_iter {
32423248
let (block_root, _) = block.unwrap();
32433249
let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap();
32443250
assert!(data_columns.is_none())
32453251
}
32463252

3253+
// Re-import deleted data columns
32473254
harness
32483255
.chain
3249-
.import_historical_data_column_batch(Epoch::new(0), data_columns_list)
3256+
.import_historical_data_column_batch(Epoch::new(0), data_columns_list, cgc)
32503257
.unwrap();
3258+
32513259
let block_root_iter = harness
32523260
.chain
32533261
.forwards_iter_block_roots_until(start_slot, end_slot)
32543262
.unwrap();
32553263

3264+
// Assert that data columns now exist for epoch 0
32563265
for block in block_root_iter {
32573266
let (block_root, _) = block.unwrap();
32583267
let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap();
@@ -3261,13 +3270,15 @@ async fn test_import_historical_data_columns_batch() {
32613270
}
32623271

32633272
// This should verify that a data column sidecar containing mismatched block roots should fail to be imported.
3273+
// This also covers any test cases related to data columns with incorrect/invalid/mismatched block roots.
32643274
#[tokio::test]
32653275
async fn test_import_historical_data_columns_batch_mismatched_block_root() {
32663276
let spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
32673277
let db_path = tempdir().unwrap();
32683278
let store = get_store_generic(&db_path, StoreConfig::default(), spec);
32693279
let start_slot = Slot::new(1);
32703280
let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1);
3281+
let cgc = 128;
32713282

32723283
let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT);
32733284

@@ -3287,6 +3298,8 @@ async fn test_import_historical_data_columns_batch_mismatched_block_root() {
32873298

32883299
let mut data_columns_list = vec![];
32893300

3301+
// Get all data columns from start_slot to end_slot
3302+
// and mutate the data columns with an invalid block root
32903303
for block in block_root_iter {
32913304
let (block_root, _) = block.unwrap();
32923305
let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap();
@@ -3312,6 +3325,7 @@ async fn test_import_historical_data_columns_batch_mismatched_block_root() {
33123325

33133326
harness.advance_slot();
33143327

3328+
// Prune blobs
33153329
harness
33163330
.chain
33173331
.store
@@ -3323,17 +3337,20 @@ async fn test_import_historical_data_columns_batch_mismatched_block_root() {
33233337
.forwards_iter_block_roots_until(start_slot, end_slot)
33243338
.unwrap();
33253339

3340+
// Assert there are no columns between start_slot and end_slot
33263341
for block in block_root_iter {
33273342
let (block_root, _) = block.unwrap();
33283343
let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap();
33293344
assert!(data_columns.is_none())
33303345
}
33313346

3347+
// Attempt to import data columns with invalid block roots and expect a failure
33323348
let error = harness
33333349
.chain
33343350
.import_historical_data_column_batch(
33353351
start_slot.epoch(E::slots_per_epoch()),
33363352
data_columns_list,
3353+
cgc,
33373354
)
33383355
.unwrap_err();
33393356

@@ -3343,84 +3360,6 @@ async fn test_import_historical_data_columns_batch_mismatched_block_root() {
33433360
));
33443361
}
33453362

3346-
// This should verify that a data column sidecar associated to a block root that doesn't exist in the store cannot
3347-
// be imported.
3348-
#[tokio::test]
3349-
async fn test_import_historical_data_columns_batch_no_block_found() {
3350-
let spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
3351-
let db_path = tempdir().unwrap();
3352-
let store = get_store_generic(&db_path, StoreConfig::default(), spec);
3353-
let start_slot = Slot::new(1);
3354-
let end_slot = Slot::new(E::slots_per_epoch() * 2 - 1);
3355-
3356-
let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT);
3357-
3358-
harness
3359-
.extend_chain(
3360-
(E::slots_per_epoch() * 2) as usize,
3361-
BlockStrategy::OnCanonicalHead,
3362-
AttestationStrategy::AllValidators,
3363-
)
3364-
.await;
3365-
harness.advance_slot();
3366-
3367-
let block_root_iter = harness
3368-
.chain
3369-
.forwards_iter_block_roots_until(start_slot, end_slot)
3370-
.unwrap();
3371-
3372-
let mut data_columns_list = vec![];
3373-
3374-
for block in block_root_iter {
3375-
let (block_root, _) = block.unwrap();
3376-
let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap();
3377-
assert!(data_columns.is_some());
3378-
3379-
for data_column in data_columns.unwrap() {
3380-
let mut data_column = (*data_column).clone();
3381-
data_column.signed_block_header.message.body_root = Hash256::ZERO;
3382-
data_columns_list.push(Arc::new(data_column));
3383-
}
3384-
}
3385-
3386-
harness
3387-
.extend_chain(
3388-
(E::slots_per_epoch() * 4) as usize,
3389-
BlockStrategy::OnCanonicalHead,
3390-
AttestationStrategy::AllValidators,
3391-
)
3392-
.await;
3393-
3394-
harness.advance_slot();
3395-
3396-
harness
3397-
.chain
3398-
.store
3399-
.try_prune_blobs(true, Epoch::new(2))
3400-
.unwrap();
3401-
3402-
let block_root_iter = harness
3403-
.chain
3404-
.forwards_iter_block_roots_until(start_slot, end_slot)
3405-
.unwrap();
3406-
3407-
for block in block_root_iter {
3408-
let (block_root, _) = block.unwrap();
3409-
let data_columns = harness.chain.store.get_data_columns(&block_root).unwrap();
3410-
assert!(data_columns.is_none())
3411-
}
3412-
3413-
let error = harness
3414-
.chain
3415-
.import_historical_data_column_batch(Epoch::new(0), data_columns_list)
3416-
.unwrap_err();
3417-
3418-
assert!(matches!(
3419-
error,
3420-
HistoricalDataColumnError::NoBlockFound { .. }
3421-
));
3422-
}
3423-
34243363
/// Test that blocks and attestations that refer to states around an unaligned split state are
34253364
/// processed correctly.
34263365
#[tokio::test]

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,9 +497,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
497497
self: &Arc<Self>,
498498
batch_id: CustodyBackfillBatchId,
499499
data_columns: DataColumnSidecarList<T::EthSpec>,
500+
expected_cgc: u64,
500501
) -> Result<(), Error<T::EthSpec>> {
501502
let processor = self.clone();
502-
let process_fn = move || processor.process_historic_data_columns(batch_id, data_columns);
503+
let process_fn =
504+
move || processor.process_historic_data_columns(batch_id, data_columns, expected_cgc);
503505

504506
let work = Work::ChainSegmentBackfill(Box::new(process_fn));
505507

beacon_node/network/src/network_beacon_processor/sync_methods.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
426426
&self,
427427
batch_id: CustodyBackfillBatchId,
428428
downloaded_columns: DataColumnSidecarList<T::EthSpec>,
429+
expected_cgc: u64,
429430
) {
430431
let _guard = debug_span!(
431432
SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS,
@@ -435,10 +436,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
435436
.entered();
436437

437438
let sent_columns = downloaded_columns.len();
438-
let result = match self
439-
.chain
440-
.import_historical_data_column_batch(batch_id.epoch, downloaded_columns)
441-
{
439+
let result = match self.chain.import_historical_data_column_batch(
440+
batch_id.epoch,
441+
downloaded_columns,
442+
expected_cgc,
443+
) {
442444
Ok(imported_columns) => {
443445
metrics::inc_counter_by(
444446
&metrics::BEACON_PROCESSOR_CUSTODY_BACKFILL_COLUMN_IMPORT_SUCCESS_TOTAL,

beacon_node/network/src/sync/custody_backfill_sync/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
504504
run_id: self.run_id,
505505
},
506506
data_columns,
507+
self.cgc,
507508
) {
508509
crit!(
509510
msg = "process_batch",

0 commit comments

Comments
 (0)