Skip to content

Commit 2e55a0a

Browse files
New design for blob/column pruning (#8266)
We are seeing some crazy IO utilisation on Holesky now that data columns have started to expire. Our previous approach of _iterating the entire blobs DB_ doesn't seem to be scaling. New blob pruning algorithm that uses a backwards block iterator from the epoch we want to prune, stopping early if an already-pruned slot is encountered. Co-Authored-By: Michael Sproul <[email protected]>
1 parent c668cb7 commit 2e55a0a

File tree

1 file changed

+100
-46
lines changed

1 file changed

+100
-46
lines changed

beacon_node/store/src/hot_cold_store.rs

Lines changed: 100 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,13 @@ impl<E: EthSpec> BlockCache<E> {
146146
pub fn delete_blobs(&mut self, block_root: &Hash256) {
147147
let _ = self.blob_cache.pop(block_root);
148148
}
149+
pub fn delete_data_columns(&mut self, block_root: &Hash256) {
150+
let _ = self.data_column_cache.pop(block_root);
151+
}
149152
pub fn delete(&mut self, block_root: &Hash256) {
150-
let _ = self.block_cache.pop(block_root);
151-
let _ = self.blob_cache.pop(block_root);
153+
self.delete_block(block_root);
154+
self.delete_blobs(block_root);
155+
self.delete_data_columns(block_root);
152156
}
153157
}
154158

@@ -2553,6 +2557,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
25532557
.collect()
25542558
}
25552559

2560+
/// Fetch all possible data column keys for a given `block_root`.
2561+
///
2562+
/// Unlike `get_data_column_keys`, these keys are not necessarily all present in the database,
2563+
/// due to the node's custody requirements many just store a subset.
2564+
pub fn get_all_data_column_keys(&self, block_root: Hash256) -> Vec<Vec<u8>> {
2565+
(0..E::number_of_columns() as u64)
2566+
.map(|column_index| get_data_column_key(&block_root, &column_index))
2567+
.collect()
2568+
}
2569+
25562570
/// Fetch a single data_column for a given block from the store.
25572571
pub fn get_data_column(
25582572
&self,
@@ -3228,13 +3242,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
32283242
return Err(HotColdDBError::BlobPruneLogicError.into());
32293243
};
32303244

3231-
// Start pruning from the epoch of the oldest blob stored.
3232-
// The start epoch is inclusive (blobs in this epoch will be pruned).
3245+
// The start epoch is not necessarily iterated back to, but is used for deciding whether we
3246+
// should attempt pruning. We could probably refactor it out eventually (while reducing our
3247+
// dependence on BlobInfo).
32333248
let start_epoch = oldest_blob_slot.epoch(E::slots_per_epoch());
32343249

32353250
// Prune blobs up until the `data_availability_boundary - margin` or the split
32363251
// slot's epoch, whichever is older. We can't prune blobs newer than the split.
3237-
// The end epoch is also inclusive (blobs in this epoch will be pruned).
3252+
// The end epoch is inclusive (blobs in this epoch will be pruned).
32383253
let split = self.get_split_info();
32393254
let end_epoch = std::cmp::min(
32403255
data_availability_boundary - margin_epochs - 1,
@@ -3257,20 +3272,30 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
32573272
return Ok(());
32583273
}
32593274

3260-
// Sanity checks.
3261-
let anchor = self.get_anchor_info();
3262-
if oldest_blob_slot < anchor.oldest_block_slot {
3263-
error!(
3264-
%oldest_blob_slot,
3265-
oldest_block_slot = %anchor.oldest_block_slot,
3266-
"Oldest blob is older than oldest block"
3275+
// Iterate blocks backwards from the `end_epoch` (usually the data availability boundary).
3276+
let Some((end_block_root, _)) = self
3277+
.forwards_block_roots_iterator_until(end_slot, end_slot, || {
3278+
self.get_hot_state(&split.state_root, true)?
3279+
.ok_or(HotColdDBError::MissingSplitState(
3280+
split.state_root,
3281+
split.slot,
3282+
))
3283+
.map(|state| (state, split.state_root))
3284+
.map_err(Into::into)
3285+
})?
3286+
.next()
3287+
.transpose()?
3288+
else {
3289+
// Can't prune blobs if we don't know the block at `end_slot`. This is expected if we
3290+
// have checkpoint synced and haven't backfilled to the DA boundary yet.
3291+
debug!(
3292+
%end_epoch,
3293+
%data_availability_boundary,
3294+
"No blobs to prune"
32673295
);
3268-
return Err(HotColdDBError::BlobPruneLogicError.into());
3269-
}
3270-
3271-
// Iterate block roots forwards from the oldest blob slot.
3296+
return Ok(());
3297+
};
32723298
debug!(
3273-
%start_epoch,
32743299
%end_epoch,
32753300
%data_availability_boundary,
32763301
"Pruning blobs"
@@ -3279,48 +3304,77 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
32793304
// We collect block roots of deleted blobs in memory. Even for 10y of blob history this
32803305
// vec won't go beyond 1GB. We can probably optimise this out eventually.
32813306
let mut removed_block_roots = vec![];
3307+
let mut blobs_db_ops = vec![];
3308+
3309+
// Iterate blocks backwards until we reach a block for which we've already pruned
3310+
// blobs/columns.
3311+
for tuple in ParentRootBlockIterator::new(self, end_block_root) {
3312+
let (block_root, blinded_block) = tuple?;
3313+
let slot = blinded_block.slot();
3314+
3315+
// If the block has no blobs we can't tell if they've been pruned, and there is nothing
3316+
// to prune, so we just skip.
3317+
if !blinded_block.message().body().has_blobs() {
3318+
continue;
3319+
}
32823320

3283-
let remove_blob_if = |blobs_bytes: &[u8]| {
3284-
let blobs = Vec::from_ssz_bytes(blobs_bytes)?;
3285-
let Some(blob): Option<&Arc<BlobSidecar<E>>> = blobs.first() else {
3286-
return Ok(false);
3287-
};
3288-
3289-
if blob.slot() <= end_slot {
3290-
// Store the block root so we can delete from the blob cache
3291-
removed_block_roots.push(blob.block_root());
3292-
// Delete from the on-disk db
3293-
return Ok(true);
3321+
// Check if we have blobs or columns stored. If not, we assume pruning has already
3322+
// reached this point.
3323+
let (db_column, db_keys) = if blinded_block.fork_name_unchecked().fulu_enabled() {
3324+
(
3325+
DBColumn::BeaconDataColumn,
3326+
self.get_all_data_column_keys(block_root),
3327+
)
3328+
} else {
3329+
(DBColumn::BeaconBlob, vec![block_root.as_slice().to_vec()])
32943330
};
3295-
Ok(false)
3296-
};
32973331

3298-
self.blobs_db
3299-
.delete_if(DBColumn::BeaconBlob, remove_blob_if)?;
3300-
3301-
if self.spec.is_peer_das_enabled_for_epoch(start_epoch) {
3302-
let remove_data_column_if = |blobs_bytes: &[u8]| {
3303-
let data_column: DataColumnSidecar<E> =
3304-
DataColumnSidecar::from_ssz_bytes(blobs_bytes)?;
3305-
3306-
if data_column.slot() <= end_slot {
3307-
return Ok(true);
3308-
};
3309-
3310-
Ok(false)
3311-
};
3332+
// For data columns, consider a block pruned if ALL column indices are absent.
3333+
// In future we might want to refactor this to read the data column indices that *exist*
3334+
// from the DB, which could be slightly more efficient than checking existence for every
3335+
// possible column.
3336+
let mut data_stored_for_block = false;
3337+
for db_key in db_keys {
3338+
if self.blobs_db.key_exists(db_column, &db_key)? {
3339+
data_stored_for_block = true;
3340+
blobs_db_ops.push(KeyValueStoreOp::DeleteKey(db_column, db_key));
3341+
}
3342+
}
33123343

3313-
self.blobs_db
3314-
.delete_if(DBColumn::BeaconDataColumn, remove_data_column_if)?;
3344+
if data_stored_for_block {
3345+
debug!(
3346+
?block_root,
3347+
%slot,
3348+
"Pruning blobs or columns for block"
3349+
);
3350+
removed_block_roots.push(block_root);
3351+
} else {
3352+
debug!(
3353+
%slot,
3354+
?block_root,
3355+
"Reached slot with blobs or columns already pruned"
3356+
);
3357+
break;
3358+
}
33153359
}
33163360

33173361
// Remove deleted blobs from the cache.
33183362
if let Some(mut block_cache) = self.block_cache.as_ref().map(|cache| cache.lock()) {
33193363
for block_root in removed_block_roots {
33203364
block_cache.delete_blobs(&block_root);
3365+
block_cache.delete_data_columns(&block_root);
33213366
}
33223367
}
33233368

3369+
// Remove from disk.
3370+
if !blobs_db_ops.is_empty() {
3371+
debug!(
3372+
num_deleted = blobs_db_ops.len(),
3373+
"Deleting blobs and data columns from disk"
3374+
);
3375+
self.blobs_db.do_atomically(blobs_db_ops)?;
3376+
}
3377+
33243378
self.update_blob_or_data_column_info(start_epoch, end_slot, blob_info, data_column_info)?;
33253379

33263380
debug!("Blob pruning complete");

0 commit comments

Comments
 (0)