Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Avoid to miss to root for local slots before the hard fork #19912

Merged
merged 7 commits into from
Jun 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 70 additions & 21 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ impl Tower {
if let Some(last_voted_slot) = self.last_voted_slot() {
if tower_root <= replayed_root {
// Normally, we goes into this clause with possible help of
// reconcile_blockstore_roots_with_tower()
// reconcile_blockstore_roots_with_external_source()
if slot_history.check(last_voted_slot) == Check::TooOld {
// We could try hard to anchor with other older votes, but opt to simplify the
// following logic
Expand Down Expand Up @@ -1320,45 +1320,77 @@ impl TowerError {
}
}

#[derive(Debug)]
pub enum ExternalRootSource {
Tower(Slot),
HardFork(Slot),
}

impl ExternalRootSource {
fn root(&self) -> Slot {
match self {
ExternalRootSource::Tower(slot) => *slot,
ExternalRootSource::HardFork(slot) => *slot,
}
}
}

// Given an untimely crash, tower may have roots that are not reflected in blockstore,
// or the reverse of this.
// That's because we don't impose any ordering guarantee or any kind of write barriers
// between tower (plain old POSIX fs calls) and blockstore (through RocksDB), when
// `ReplayState::handle_votable_bank()` saves tower before setting blockstore roots.
pub fn reconcile_blockstore_roots_with_tower(
tower: &Tower,
pub fn reconcile_blockstore_roots_with_external_source(
external_source: ExternalRootSource,
blockstore: &Blockstore,
// blockstore.last_root() might have been updated already.
// so take a &mut param both to input (and output iff we update root)
last_blockstore_root: &mut Slot,
Comment on lines +1346 to +1348
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there

) -> blockstore_db::Result<()> {
let tower_root = tower.root();
let last_blockstore_root = blockstore.last_root();
if last_blockstore_root < tower_root {
// Ensure tower_root itself to exist and be marked as rooted in the blockstore
let external_root = external_source.root();
if *last_blockstore_root < external_root {
// Ensure external_root itself to exist and be marked as rooted in the blockstore
// in addition to its ancestors.
let new_roots: Vec<_> = AncestorIterator::new_inclusive(tower_root, blockstore)
.take_while(|current| match current.cmp(&last_blockstore_root) {
let new_roots: Vec<_> = AncestorIterator::new_inclusive(external_root, blockstore)
.take_while(|current| match current.cmp(last_blockstore_root) {
Ordering::Greater => true,
Ordering::Equal => false,
Ordering::Less => panic!(
"couldn't find a last_blockstore_root upwards from: {}!?",
tower_root
"last_blockstore_root({}) is skipped while traversing \
blockstore (currently at {}) from external root ({:?})!?",
last_blockstore_root, current, external_source,
),
})
.collect();
if !new_roots.is_empty() {
info!(
"Reconciling slots as root based on tower root: {:?} ({}..{}) ",
new_roots, tower_root, last_blockstore_root
"Reconciling slots as root based on external root: {:?} (external: {:?}, blockstore: {})",
new_roots, external_source, last_blockstore_root
);
blockstore.set_roots(new_roots.iter())?;

// Unfortunately, we can't supply duplicate-confirmed hashes,
// because it can't be guaranteed to be able to replay these slots
// under this code-path's limited condition (i.e. those shreds
// might not be available, etc...) also correctly overcoming this
// limitation is hard...
blockstore.mark_slots_as_if_rooted_normally_at_startup(
new_roots.into_iter().map(|root| (root, None)).collect(),
false,
)?;
Comment on lines +1371 to +1379
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm yeah this is yet another tricky edge case to reason about, but I think it should be ok.

Not marking these slots as duplicate confirmed just means:

  1. we can't serve requests to people asking for the correct version of these slots via AncestorHashesRepairType
  2. Should be safe as long as we're not freezing the slot again during replay (not happening b/c rooted)


// Update the caller-managed state of last root in blockstore.
// Repeated calls of this function should result in a no-op for
// the range of `new_roots`.
*last_blockstore_root = blockstore.last_root();
} else {
// This indicates we're in bad state; but still don't panic here.
// That's because we might have a chance of recovering properly with
// newer snapshot.
warn!(
"Couldn't find any ancestor slots from tower root ({}) \
"Couldn't find any ancestor slots from external source ({:?}) \
towards blockstore root ({}); blockstore pruned or only \
tower moved into new ledger?",
tower_root, last_blockstore_root,
tower moved into new ledger or just hard fork?",
external_source, last_blockstore_root,
);
}
}
Expand Down Expand Up @@ -2814,7 +2846,12 @@ pub mod test {

let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap();
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();

assert!(!blockstore.is_root(0));
assert!(blockstore.is_root(1));
Expand All @@ -2825,7 +2862,9 @@ pub mod test {
}

#[test]
#[should_panic(expected = "couldn't find a last_blockstore_root upwards from: 4!?")]
#[should_panic(expected = "last_blockstore_root(3) is skipped while \
traversing blockstore (currently at 1) from \
external root (Tower(4))!?")]
fn test_reconcile_blockstore_roots_with_tower_panic_no_common_root() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
Expand All @@ -2846,7 +2885,12 @@ pub mod test {

let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap();
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
Expand All @@ -2869,7 +2913,12 @@ pub mod test {
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
assert_eq!(blockstore.last_root(), 0);
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap();
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
assert_eq!(blockstore.last_root(), 0);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
Expand Down
15 changes: 11 additions & 4 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use {
rewards_recorder_service::RewardsRecorderSender,
tower_storage::{SavedTower, SavedTowerVersions, TowerStorage},
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
validator::ProcessBlockStore,
voting_service::VoteOp,
window_service::DuplicateSlotReceiver,
},
Expand Down Expand Up @@ -352,15 +353,15 @@ pub struct ReplayStage {

impl ReplayStage {
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new<T: Into<Tower> + Sized>(
pub fn new(
config: ReplayStageConfig,
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
ledger_signal_receiver: Receiver<bool>,
duplicate_slots_receiver: DuplicateSlotReceiver,
poh_recorder: Arc<Mutex<PohRecorder>>,
tower: T,
maybe_process_blockstore: Option<ProcessBlockStore>,
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender,
Expand All @@ -375,8 +376,14 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
transaction_cost_metrics_sender: Option<TransactionCostMetricsSender>,
) -> Self {
let mut tower = tower.into();
info!("Tower state: {:?}", tower);
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower();
info!("Tower state: {:?}", tower);
tower
} else {
warn!("creating default tower....");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here

Tower::default()
};

let ReplayStageConfig {
vote_account,
Expand Down
11 changes: 5 additions & 6 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use {
},
cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender,
consensus::Tower,
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
ledger_cleanup_service::LedgerCleanupService,
Expand All @@ -22,6 +21,7 @@ use {
sigverify_shreds::ShredSigVerifier,
sigverify_stage::SigVerifyStage,
tower_storage::TowerStorage,
validator::ProcessBlockStore,
voting_service::VotingService,
warm_quic_cache_service::WarmQuicCacheService,
},
Expand Down Expand Up @@ -100,7 +100,7 @@ impl Tvu {
/// * `sockets` - fetch, repair, and retransmit sockets
/// * `blockstore` - the ledger itself
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new<T: Into<Tower> + Sized>(
pub fn new(
vote_account: &Pubkey,
authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
bank_forks: &Arc<RwLock<BankForks>>,
Expand All @@ -110,7 +110,7 @@ impl Tvu {
ledger_signal_receiver: Receiver<bool>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
tower: T,
maybe_process_block_store: Option<ProcessBlockStore>,
tower_storage: Arc<dyn TowerStorage>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
Expand Down Expand Up @@ -264,7 +264,7 @@ impl Tvu {
ledger_signal_receiver,
duplicate_slots_receiver,
poh_recorder.clone(),
tower,
maybe_process_block_store,
vote_tracker,
cluster_slots,
retransmit_slots_sender,
Expand Down Expand Up @@ -404,7 +404,6 @@ pub mod tests {
let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded();
let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let tower = Tower::default();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Expand All @@ -430,7 +429,7 @@ pub mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)),
&poh_recorder,
tower,
None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here

Arc::new(crate::tower_storage::FileTowerStorage::default()),
&leader_schedule_cache,
&exit,
Expand Down
Loading