Skip to content

Commit e2d951a

Browse files
committed
Refactor to avoid holding database transactions longer than necessary
1 parent 1e63fb0 commit e2d951a

2 files changed

Lines changed: 122 additions & 107 deletions

File tree

database/src/lib.rs

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,39 @@ impl Database {
184184
Ok(())
185185
}
186186

187+
pub fn delete_batch(&self, keys: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Result<()> {
188+
match self.kind() {
189+
#[cfg(not(target_os = "zkvm"))]
190+
DatabaseKind::Persistent {
191+
database_name,
192+
environment,
193+
restart_tx: _,
194+
} => {
195+
let transaction = environment.begin_rw_txn()?;
196+
let database = transaction.open_db(Some(database_name))?;
197+
198+
let mut cursor = transaction.cursor(database.dbi())?;
199+
200+
for key in keys {
201+
if cursor.set::<()>(key.as_ref())?.is_some() {
202+
cursor.del(WriteFlags::default())?;
203+
}
204+
}
205+
206+
transaction.commit()?;
207+
}
208+
DatabaseKind::InMemory { map } => {
209+
let mut map = map.lock().expect("in-memory database mutex is poisoned");
210+
211+
for key in keys {
212+
map.remove(key.as_ref());
213+
}
214+
}
215+
}
216+
217+
Ok(())
218+
}
219+
187220
pub fn delete_range(&self, range: Range<impl AsRef<[u8]>>) -> Result<()> {
188221
let start = range.start.as_ref();
189222
let end = range.end.as_ref();
@@ -436,7 +469,6 @@ impl Database {
436469
} => {
437470
let transaction = environment.begin_ro_txn()?;
438471
let database = transaction.open_db(Some(database_name))?;
439-
440472
let mut cursor = transaction.cursor(database.dbi())?;
441473

442474
let first = if let Some((is_next, key, value)) = cursor.set_lowerbound(end)? {
@@ -500,14 +532,22 @@ impl Database {
500532
environment,
501533
restart_tx,
502534
} => {
535+
let compressed_pairs = pairs
536+
.into_iter()
537+
.map(|(key, value)| Ok((key, compress(value.as_ref())?)))
538+
.collect::<Result<Vec<_>>>()?;
539+
503540
let transaction = environment.begin_rw_txn()?;
504541
let database = transaction.open_db(Some(database_name))?;
505542

506-
for (key, value) in pairs {
507-
let key = key.as_ref();
508-
let compressed = compress(value.as_ref())?;
543+
for (key, compressed) in &compressed_pairs {
509544
transaction
510-
.put(database.dbi(), key, compressed, WriteFlags::default())
545+
.put(
546+
database.dbi(),
547+
key.as_ref(),
548+
compressed,
549+
WriteFlags::default(),
550+
)
511551
.map_err(|error| {
512552
handle_write_error(database_name, error, restart_tx.as_ref())
513553
})?;
@@ -709,6 +749,21 @@ mod tests {
709749
Ok(())
710750
}
711751

752+
#[test_case(build_persistent_database)]
753+
#[test_case(build_in_memory_database)]
754+
fn test_delete_batch(constructor: Constructor) -> Result<()> {
755+
let database = constructor()?;
756+
757+
database.delete_batch(["A", "C", "D"])?;
758+
759+
assert_pairs_eq(
760+
database.iterator_ascending("A"..)?,
761+
[("B", "2"), ("E", "5")],
762+
)?;
763+
764+
Ok(())
765+
}
766+
712767
#[test_case(build_persistent_database)]
713768
#[test_case(build_in_memory_database)]
714769
fn test_delete_range_inclusive_exclusive(constructor: Constructor) -> Result<()> {

fork_choice_control/src/storage.rs

Lines changed: 62 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -409,72 +409,47 @@ impl<P: Preset> Storage<P> {
409409
}
410410

411411
pub(crate) fn prune_old_blob_sidecars(&self, up_to_slot: Slot) -> Result<()> {
412-
let mut blobs_to_remove: Vec<BlobIdentifier> = vec![];
413-
let mut keys_to_remove = vec![];
414-
415412
let results = self
416413
.database
417414
.iterator_descending(..=SlotBlobId(up_to_slot, H256::zero(), 0).to_string())?;
418415

419-
for result in results {
420-
let (key_bytes, value_bytes) = result?;
416+
let (mut keys_to_remove, blobs_to_remove): (Vec<_>, Vec<_>) =
417+
itertools::process_results(results, |iter| {
418+
iter.take_while(|(key_bytes, _)| SlotBlobId::has_prefix(key_bytes))
419+
.map(|(k, v)| (k.into_owned(), v))
420+
.unzip()
421+
})?;
421422

422-
if !SlotBlobId::has_prefix(&key_bytes) {
423-
break;
424-
}
423+
for blob_bytes in blobs_to_remove {
424+
let BlobIdentifier { block_root, index } =
425+
BlobIdentifier::from_ssz_default(blob_bytes)?;
425426

426-
// Deserialize-serialize BlobIdentifier as an additional measure
427-
// to prevent other types of data getting accidentally deleted.
428-
blobs_to_remove.push(BlobIdentifier::from_ssz_default(value_bytes)?);
429-
keys_to_remove.push(key_bytes.into_owned());
427+
keys_to_remove.push(BlobSidecarByBlobId(block_root, index).to_string().into());
430428
}
431429

432-
for blob_id in blobs_to_remove {
433-
let BlobIdentifier { block_root, index } = blob_id;
434-
let key = BlobSidecarByBlobId(block_root, index).to_string();
435-
436-
self.database.delete(key)?;
437-
}
438-
439-
for key in keys_to_remove {
440-
self.database.delete(key)?;
441-
}
442-
443-
Ok(())
430+
self.database.delete_batch(keys_to_remove)
444431
}
445432

446433
pub(crate) fn prune_old_blocks_and_states(&self, up_to_slot: Slot) -> Result<()> {
447-
let mut block_roots_to_remove = vec![];
448-
let mut keys_to_remove = vec![];
449-
450434
let results = self
451435
.database
452436
.iterator_descending(..=BlockRootBySlot(up_to_slot.saturating_sub(1)).to_string())?;
453437

454-
for result in results {
455-
let (key_bytes, value_bytes) = result?;
456-
457-
if !BlockRootBySlot::has_prefix(&key_bytes) {
458-
break;
459-
}
460-
461-
block_roots_to_remove.push(H256::from_ssz_default(value_bytes)?);
462-
keys_to_remove.push(key_bytes.into_owned());
463-
}
464-
465-
for block_root in block_roots_to_remove {
466-
let key = FinalizedBlockByRoot(block_root).to_string();
467-
self.database.delete(key)?;
438+
let (mut keys_to_remove, block_roots_to_remove): (Vec<_>, Vec<_>) =
439+
itertools::process_results(results, |iter| {
440+
iter.take_while(|(key_bytes, _)| BlockRootBySlot::has_prefix(key_bytes))
441+
.map(|(k, v)| (k.into_owned(), v))
442+
.unzip()
443+
})?;
468444

469-
let key = StateByBlockRoot(block_root).to_string();
470-
self.database.delete(key)?;
471-
}
445+
for block_root_bytes in block_roots_to_remove {
446+
let block_root = H256::from_ssz_default(block_root_bytes)?;
472447

473-
for key in keys_to_remove {
474-
self.database.delete(key)?;
448+
keys_to_remove.push(FinalizedBlockByRoot(block_root).to_string().into());
449+
keys_to_remove.push(StateByBlockRoot(block_root).to_string().into());
475450
}
476451

477-
Ok(())
452+
self.database.delete_batch(keys_to_remove)
478453
}
479454

480455
pub(crate) fn prune_old_state_roots(&self, up_to_slot: Slot) -> Result<()> {
@@ -484,25 +459,21 @@ impl<P: Preset> Storage<P> {
484459
.database
485460
.iterator_ascending(SlotByStateRoot(H256::zero()).to_string()..)?;
486461

487-
for result in results {
488-
let (key_bytes, value_bytes) = result?;
489-
490-
if !SlotByStateRoot::has_prefix(&key_bytes) {
491-
break;
492-
}
462+
let results = itertools::process_results(results, |iter| {
463+
iter.take_while(|(key_bytes, _)| SlotByStateRoot::has_prefix(key_bytes))
464+
.map(|(k, v)| (k.into_owned(), v))
465+
.collect::<Vec<_>>()
466+
})?;
493467

468+
for (key_bytes, value_bytes) in results {
494469
let slot = Slot::from_ssz_default(value_bytes)?;
495470

496471
if slot < up_to_slot {
497-
keys_to_remove.push(key_bytes.into_owned());
472+
keys_to_remove.push(key_bytes);
498473
}
499474
}
500475

501-
for key in keys_to_remove {
502-
self.database.delete(key)?;
503-
}
504-
505-
Ok(())
476+
self.database.delete_batch(keys_to_remove)
506477
}
507478

508479
pub(crate) fn prune_unfinalized_blocks(&self, last_finalized_slot: Slot) -> Result<Vec<Slot>> {
@@ -513,19 +484,19 @@ impl<P: Preset> Storage<P> {
513484
.database
514485
.iterator_ascending(serialize_key(UnfinalizedBlockByRoot(H256::zero()))..)?;
515486

516-
for result in results {
517-
let (key_bytes, value_bytes) = result?;
518-
519-
if !UnfinalizedBlockByRoot::has_prefix(&key_bytes) {
520-
break;
521-
}
487+
let results = itertools::process_results(results, |iter| {
488+
iter.take_while(|(key_bytes, _)| UnfinalizedBlockByRoot::has_prefix(key_bytes))
489+
.map(|(k, v)| (k.into_owned(), v))
490+
.collect::<Vec<_>>()
491+
})?;
522492

493+
for (key_bytes, value_bytes) in results {
523494
let unfinalized_block = SignedBeaconBlock::<P>::from_ssz(&self.config, value_bytes)?;
524495
let block_slot = unfinalized_block.message().slot();
525496

526497
if block_slot <= last_finalized_slot {
527498
slots.push(block_slot);
528-
keys_to_remove.push(key_bytes.into_owned());
499+
keys_to_remove.push(key_bytes);
529500
}
530501
}
531502

@@ -539,9 +510,7 @@ impl<P: Preset> Storage<P> {
539510
}
540511
}
541512

542-
for key in keys_to_remove {
543-
self.database.delete(key)?;
544-
}
513+
self.database.delete_batch(keys_to_remove)?;
545514

546515
Ok(slots)
547516
}
@@ -591,38 +560,29 @@ impl<P: Preset> Storage<P> {
591560
}
592561

593562
pub(crate) fn prune_old_data_column_sidecars(&self, up_to_slot: Slot) -> Result<()> {
594-
let mut columns_to_remove: Vec<DataColumnIdentifier> = vec![];
595-
let mut keys_to_remove = vec![];
596-
597563
let results = self
598564
.database
599565
.iterator_descending(..=SlotColumnId(up_to_slot, H256::zero(), 0).to_string())?;
600566

601-
for result in results {
602-
let (key_bytes, value_bytes) = result?;
603-
604-
if !SlotColumnId::has_prefix(&key_bytes) {
605-
break;
606-
}
607-
608-
// Deserialize-serialize DataColumnIdentifier as an additional measure
609-
// to prevent other types of data getting accidentally deleted.
610-
columns_to_remove.push(DataColumnIdentifier::from_ssz_default(value_bytes)?);
611-
keys_to_remove.push(key_bytes.into_owned());
567+
let (mut keys_to_remove, columns_to_remove): (Vec<_>, Vec<_>) =
568+
itertools::process_results(results, |iter| {
569+
iter.take_while(|(key_bytes, _)| SlotColumnId::has_prefix(key_bytes))
570+
.map(|(k, v)| (k.into_owned(), v))
571+
.unzip()
572+
})?;
573+
574+
for column_bytes in columns_to_remove {
575+
let DataColumnIdentifier { block_root, index } =
576+
DataColumnIdentifier::from_ssz_default(column_bytes)?;
577+
578+
keys_to_remove.push(
579+
DataColumnSidecarByColumnId(block_root, index)
580+
.to_string()
581+
.into(),
582+
)
612583
}
613584

614-
for column_id in columns_to_remove {
615-
let DataColumnIdentifier { block_root, index } = column_id;
616-
let key = DataColumnSidecarByColumnId(block_root, index).to_string();
617-
618-
self.database.delete(key)?;
619-
}
620-
621-
for key in keys_to_remove {
622-
self.database.delete(key)?;
623-
}
624-
625-
Ok(())
585+
self.database.delete_batch(keys_to_remove)
626586
}
627587

628588
pub(crate) fn checkpoint_state_slot(&self) -> Result<Option<Slot>> {
@@ -874,15 +834,15 @@ impl<P: Preset> Storage<P> {
874834
.database
875835
.iterator_descending(..=BlockRootBySlot(start_from_slot).to_string())?;
876836

877-
let mut block_roots = vec![];
878-
879-
for result in results {
880-
let (key_bytes, value_bytes) = result?;
837+
let results = itertools::process_results(results, |iter| {
838+
iter.take_while(|(key_bytes, _)| BlockRootBySlot::has_prefix(key_bytes))
839+
.map(|(_, v)| v)
840+
.collect::<Vec<_>>()
841+
})?;
881842

882-
if !BlockRootBySlot::has_prefix(&key_bytes) {
883-
break;
884-
}
843+
let mut block_roots = vec![];
885844

845+
for value_bytes in results {
886846
let block_root = H256::from_ssz_default(value_bytes)?;
887847

888848
if self.contains_key(StateByBlockRoot(block_root))? {

0 commit comments

Comments
 (0)