Skip to content

Commit bf0a368

Browse files
authored
Make ReplayStage create the parallel fork replay threadpool (solana-labs#137)
ReplayStage owning the pool allows for subsequent work to configure the size of the pool; configuring the size of the pool inside of the lazy_static would have been a little messy
1 parent 1ac523c commit bf0a368

File tree

1 file changed

+73
-63
lines changed

1 file changed

+73
-63
lines changed

core/src/replay_stage.rs

+73-63
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use {
3333
window_service::DuplicateSlotReceiver,
3434
},
3535
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
36-
lazy_static::lazy_static,
3736
rayon::{prelude::*, ThreadPool},
3837
solana_entry::entry::VerifyRecyclers,
3938
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
@@ -102,14 +101,6 @@ const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
102101
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
103102
const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10;
104103

105-
lazy_static! {
106-
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
107-
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
108-
.thread_name(|i| format!("solReplay{i:02}"))
109-
.build()
110-
.unwrap();
111-
}
112-
113104
#[derive(PartialEq, Eq, Debug)]
114105
pub enum HeaviestForkFailures {
115106
LockedOut(u64),
@@ -131,6 +122,11 @@ pub enum HeaviestForkFailures {
131122
),
132123
}
133124

125+
enum ForkReplayMode {
126+
Serial,
127+
Parallel(ThreadPool),
128+
}
129+
134130
#[derive(PartialEq, Eq, Debug)]
135131
enum ConfirmationType {
136132
SupermajorityVoted,
@@ -656,6 +652,16 @@ impl ReplayStage {
656652
r_bank_forks.get_vote_only_mode_signal(),
657653
)
658654
};
655+
let replay_mode = if replay_slots_concurrently {
656+
ForkReplayMode::Serial
657+
} else {
658+
let pool = rayon::ThreadPoolBuilder::new()
659+
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
660+
.thread_name(|i| format!("solReplay{i:02}"))
661+
.build()
662+
.expect("new rayon threadpool");
663+
ForkReplayMode::Parallel(pool)
664+
};
659665

660666
Self::reset_poh_recorder(
661667
&my_pubkey,
@@ -717,7 +723,7 @@ impl ReplayStage {
717723
block_metadata_notifier.clone(),
718724
&mut replay_timing,
719725
log_messages_bytes_limit,
720-
replay_slots_concurrently,
726+
&replay_mode,
721727
&prioritization_fee_cache,
722728
&mut purge_repair_slot_counter,
723729
);
@@ -2706,6 +2712,7 @@ impl ReplayStage {
27062712
fn replay_active_banks_concurrently(
27072713
blockstore: &Blockstore,
27082714
bank_forks: &RwLock<BankForks>,
2715+
thread_pool: &ThreadPool,
27092716
my_pubkey: &Pubkey,
27102717
vote_account: &Pubkey,
27112718
progress: &mut ProgressMap,
@@ -2723,7 +2730,7 @@ impl ReplayStage {
27232730
let longest_replay_time_us = AtomicU64::new(0);
27242731

27252732
// Allow for concurrent replaying of slots from different forks.
2726-
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = PAR_THREAD_POOL.install(|| {
2733+
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = thread_pool.install(|| {
27272734
active_bank_slots
27282735
.into_par_iter()
27292736
.map(|bank_slot| {
@@ -2737,7 +2744,7 @@ impl ReplayStage {
27372744
trace!(
27382745
"Replay active bank: slot {}, thread_idx {}",
27392746
bank_slot,
2740-
PAR_THREAD_POOL.current_thread_index().unwrap_or_default()
2747+
thread_pool.current_thread_index().unwrap_or_default()
27412748
);
27422749
let mut progress_lock = progress.write().unwrap();
27432750
if progress_lock
@@ -3175,7 +3182,7 @@ impl ReplayStage {
31753182
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
31763183
replay_timing: &mut ReplayLoopTiming,
31773184
log_messages_bytes_limit: Option<usize>,
3178-
replay_slots_concurrently: bool,
3185+
replay_mode: &ForkReplayMode,
31793186
prioritization_fee_cache: &PrioritizationFeeCache,
31803187
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
31813188
) -> bool /* completed a bank */ {
@@ -3186,11 +3193,17 @@ impl ReplayStage {
31863193
num_active_banks,
31873194
active_bank_slots
31883195
);
3189-
if num_active_banks > 0 {
3190-
let replay_result_vec = if num_active_banks > 1 && replay_slots_concurrently {
3196+
if active_bank_slots.is_empty() {
3197+
return false;
3198+
}
3199+
3200+
let replay_result_vec = match replay_mode {
3201+
// Skip the overhead of the threadpool if there is only one bank to play
3202+
ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => {
31913203
Self::replay_active_banks_concurrently(
31923204
blockstore,
31933205
bank_forks,
3206+
thread_pool,
31943207
my_pubkey,
31953208
vote_account,
31963209
progress,
@@ -3203,55 +3216,52 @@ impl ReplayStage {
32033216
&active_bank_slots,
32043217
prioritization_fee_cache,
32053218
)
3206-
} else {
3207-
active_bank_slots
3208-
.iter()
3209-
.map(|bank_slot| {
3210-
Self::replay_active_bank(
3211-
blockstore,
3212-
bank_forks,
3213-
my_pubkey,
3214-
vote_account,
3215-
progress,
3216-
transaction_status_sender,
3217-
entry_notification_sender,
3218-
verify_recyclers,
3219-
replay_vote_sender,
3220-
replay_timing,
3221-
log_messages_bytes_limit,
3222-
*bank_slot,
3223-
prioritization_fee_cache,
3224-
)
3225-
})
3226-
.collect()
3227-
};
3219+
}
3220+
ForkReplayMode::Serial | ForkReplayMode::Parallel(_) => active_bank_slots
3221+
.iter()
3222+
.map(|bank_slot| {
3223+
Self::replay_active_bank(
3224+
blockstore,
3225+
bank_forks,
3226+
my_pubkey,
3227+
vote_account,
3228+
progress,
3229+
transaction_status_sender,
3230+
entry_notification_sender,
3231+
verify_recyclers,
3232+
replay_vote_sender,
3233+
replay_timing,
3234+
log_messages_bytes_limit,
3235+
*bank_slot,
3236+
prioritization_fee_cache,
3237+
)
3238+
})
3239+
.collect(),
3240+
};
32283241

3229-
Self::process_replay_results(
3230-
blockstore,
3231-
bank_forks,
3232-
progress,
3233-
transaction_status_sender,
3234-
cache_block_meta_sender,
3235-
heaviest_subtree_fork_choice,
3236-
bank_notification_sender,
3237-
rewards_recorder_sender,
3238-
rpc_subscriptions,
3239-
duplicate_slots_tracker,
3240-
duplicate_confirmed_slots,
3241-
epoch_slots_frozen_slots,
3242-
unfrozen_gossip_verified_vote_hashes,
3243-
latest_validator_votes_for_frozen_banks,
3244-
cluster_slots_update_sender,
3245-
cost_update_sender,
3246-
duplicate_slots_to_repair,
3247-
ancestor_hashes_replay_update_sender,
3248-
block_metadata_notifier,
3249-
&replay_result_vec,
3250-
purge_repair_slot_counter,
3251-
)
3252-
} else {
3253-
false
3254-
}
3242+
Self::process_replay_results(
3243+
blockstore,
3244+
bank_forks,
3245+
progress,
3246+
transaction_status_sender,
3247+
cache_block_meta_sender,
3248+
heaviest_subtree_fork_choice,
3249+
bank_notification_sender,
3250+
rewards_recorder_sender,
3251+
rpc_subscriptions,
3252+
duplicate_slots_tracker,
3253+
duplicate_confirmed_slots,
3254+
epoch_slots_frozen_slots,
3255+
unfrozen_gossip_verified_vote_hashes,
3256+
latest_validator_votes_for_frozen_banks,
3257+
cluster_slots_update_sender,
3258+
cost_update_sender,
3259+
duplicate_slots_to_repair,
3260+
ancestor_hashes_replay_update_sender,
3261+
block_metadata_notifier,
3262+
&replay_result_vec,
3263+
purge_repair_slot_counter,
3264+
)
32553265
}
32563266

32573267
#[allow(clippy::too_many_arguments)]

0 commit comments

Comments
 (0)