Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 083971a

Browse files
committed
Add --unified-scheduler-handler-threads
1 parent 897adb2 commit 083971a

File tree

10 files changed

+108
-21
lines changed

10 files changed

+108
-21
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/validator.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ pub struct ValidatorConfig {
262262
pub generator_config: Option<GeneratorConfig>,
263263
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
264264
pub wen_restart_proto_path: Option<PathBuf>,
265+
pub unified_scheduler_handler_threads: Option<usize>,
265266
}
266267

267268
impl Default for ValidatorConfig {
@@ -329,6 +330,7 @@ impl Default for ValidatorConfig {
329330
generator_config: None,
330331
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
331332
wen_restart_proto_path: None,
333+
unified_scheduler_handler_threads: None,
332334
}
333335
}
334336
}
@@ -816,6 +818,7 @@ impl Validator {
816818
}
817819
BlockVerificationMethod::UnifiedScheduler => {
818820
let scheduler_pool = DefaultSchedulerPool::new_dyn(
821+
config.unified_scheduler_handler_threads,
819822
config.runtime_config.log_messages_bytes_limit,
820823
transaction_status_sender.clone(),
821824
Some(replay_vote_sender.clone()),

ledger-tool/src/ledger_utils.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,13 +296,16 @@ pub fn load_and_process_ledger(
296296
info!("no scheduler pool is installed for block verification...");
297297
}
298298
BlockVerificationMethod::UnifiedScheduler => {
299+
let unified_scheduler_handler_threads =
300+
value_t!(arg_matches, "unified_scheduler_handler_threads", usize).ok();
299301
let no_transaction_status_sender = None;
300302
let no_replay_vote_sender = None;
301303
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
302304
bank_forks
303305
.write()
304306
.unwrap()
305307
.install_scheduler_pool(DefaultSchedulerPool::new_dyn(
308+
unified_scheduler_handler_threads,
306309
process_options.runtime_config.log_messages_bytes_limit,
307310
no_transaction_status_sender,
308311
no_replay_vote_sender,

ledger-tool/src/main.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use {
2828
input_parsers::{cluster_type_of, pubkey_of, pubkeys_of},
2929
input_validators::{
3030
is_parsable, is_pow2, is_pubkey, is_pubkey_or_keypair, is_slot, is_valid_percentage,
31-
validate_maximum_full_snapshot_archives_to_retain,
31+
is_within_range, validate_maximum_full_snapshot_archives_to_retain,
3232
validate_maximum_incremental_snapshot_archives_to_retain,
3333
},
3434
},
@@ -72,6 +72,7 @@ use {
7272
transaction::{MessageHash, SanitizedTransaction, SimpleAddressLoader},
7373
},
7474
solana_stake_program::stake_state::{self, PointValue},
75+
solana_unified_scheduler_pool::DefaultSchedulerPool,
7576
solana_vote_program::{
7677
self,
7778
vote_state::{self, VoteState},
@@ -847,6 +848,16 @@ fn main() {
847848
.hidden(hidden_unless_forced())
848849
.help(BlockVerificationMethod::cli_message()),
849850
)
851+
.arg(
852+
Arg::with_name("unified_scheduler_handler_threads")
853+
.long("unified-scheduler-handler-threads")
854+
.value_name("THREADS")
855+
.takes_value(true)
856+
.validator(|s| is_within_range(s, 1..))
857+
.global(true)
858+
.hidden(hidden_unless_forced())
859+
.help(DefaultSchedulerPool::cli_message()),
860+
)
850861
.arg(
851862
Arg::with_name("output_format")
852863
.long("output")

local-cluster/src/validator_configs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
6868
generator_config: config.generator_config.clone(),
6969
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
7070
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
71+
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
7172
}
7273
}
7374

programs/sbf/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

unified-scheduler-pool/src/lib.rs

Lines changed: 74 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use {
3434
marker::PhantomData,
3535
sync::{
3636
atomic::{AtomicU64, Ordering::Relaxed},
37-
Arc, Mutex, Weak,
37+
Arc, Mutex, OnceLock, Weak,
3838
},
3939
thread::{self, JoinHandle},
4040
},
@@ -48,6 +48,7 @@ type AtomicSchedulerId = AtomicU64;
4848
#[derive(Debug)]
4949
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
5050
scheduler_inners: Mutex<Vec<S::Inner>>,
51+
handler_count: usize,
5152
handler_context: HandlerContext,
5253
// weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to
5354
// Arc<Self> from &Self, because SchedulerPool is used as in the form of Arc<SchedulerPool>
@@ -83,13 +84,20 @@ where
8384
// Some internal impl and test code want an actual concrete type, NOT the
8485
// `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`.
8586
fn new(
87+
handler_count: Option<usize>,
8688
log_messages_bytes_limit: Option<usize>,
8789
transaction_status_sender: Option<TransactionStatusSender>,
8890
replay_vote_sender: Option<ReplayVoteSender>,
8991
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
9092
) -> Arc<Self> {
93+
let handler_count = handler_count.unwrap_or(Self::default_handler_count());
94+
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
95+
// single-threaded still.
96+
assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later
97+
9198
Arc::new_cyclic(|weak_self| Self {
9299
scheduler_inners: Mutex::default(),
100+
handler_count,
93101
handler_context: HandlerContext {
94102
log_messages_bytes_limit,
95103
transaction_status_sender,
@@ -105,12 +113,14 @@ where
105113
// This apparently-meaningless wrapper is handy, because some callers explicitly want
106114
// `dyn InstalledSchedulerPool` to be returned for type inference convenience.
107115
pub fn new_dyn(
116+
handler_count: Option<usize>,
108117
log_messages_bytes_limit: Option<usize>,
109118
transaction_status_sender: Option<TransactionStatusSender>,
110119
replay_vote_sender: Option<ReplayVoteSender>,
111120
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
112121
) -> InstalledSchedulerPoolArc {
113122
Self::new(
123+
handler_count,
114124
log_messages_bytes_limit,
115125
transaction_status_sender,
116126
replay_vote_sender,
@@ -145,6 +155,37 @@ where
145155
S::spawn(self.self_arc(), context)
146156
}
147157
}
158+
159+
pub fn default_handler_count() -> usize {
160+
Self::calculate_default_handler_count(
161+
thread::available_parallelism()
162+
.ok()
163+
.map(|non_zero| non_zero.get()),
164+
)
165+
}
166+
167+
pub fn calculate_default_handler_count(detected_cpu_core_count: Option<usize>) -> usize {
168+
// Divide by 4 just not to consume all available CPUs just with handler threads, sparing for
169+
// other active forks and other subsystems.
170+
// Also, if available_parallelism fails (which should be very rare), use 4 threads,
171+
// as a relatively conservatism assumption of modern multi-core systems ranging from
172+
// engineers' laptops to production servers.
173+
detected_cpu_core_count
174+
.map(|core_count| (core_count / 4).max(1))
175+
.unwrap_or(4)
176+
}
177+
178+
pub fn cli_message() -> &'static str {
179+
static MESSAGE: OnceLock<String> = OnceLock::new();
180+
181+
MESSAGE.get_or_init(|| {
182+
format!(
183+
"Change the number of the unified scheduler's transaction execution threads \
184+
dedicated to each block, otherwise calculated as cpu_cores/4 [default: {}]",
185+
Self::default_handler_count()
186+
)
187+
})
188+
}
148189
}
149190

150191
impl<S, TH> InstalledSchedulerPool for SchedulerPool<S, TH>
@@ -372,7 +413,6 @@ pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> {
372413
struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
373414
scheduler_id: SchedulerId,
374415
pool: Arc<SchedulerPool<S, TH>>,
375-
handler_count: usize,
376416
new_task_sender: Sender<NewTaskPayload>,
377417
new_task_receiver: Receiver<NewTaskPayload>,
378418
session_result_sender: Sender<Option<ResultWithTimings>>,
@@ -384,28 +424,24 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
384424

385425
impl<TH: TaskHandler> PooledScheduler<TH> {
386426
fn do_spawn(pool: Arc<SchedulerPool<Self, TH>>, initial_context: SchedulingContext) -> Self {
387-
// we're hard-coding the number of handler thread to 1, meaning this impl is currently
388-
// single-threaded still.
389-
let handler_count = 1;
390-
391427
Self::from_inner(
392428
PooledSchedulerInner::<Self, TH> {
393-
thread_manager: ThreadManager::new(pool, handler_count),
429+
thread_manager: ThreadManager::new(pool),
394430
},
395431
initial_context,
396432
)
397433
}
398434
}
399435

400436
impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
401-
fn new(pool: Arc<SchedulerPool<S, TH>>, handler_count: usize) -> Self {
437+
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
402438
let (new_task_sender, new_task_receiver) = unbounded();
403439
let (session_result_sender, session_result_receiver) = unbounded();
440+
let handler_count = pool.handler_count;
404441

405442
Self {
406443
scheduler_id: pool.new_scheduler_id(),
407444
pool,
408-
handler_count,
409445
new_task_sender,
410446
new_task_receiver,
411447
session_result_sender,
@@ -477,7 +513,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
477513
// 5. the handler thread reply back to the scheduler thread as an executed task.
478514
// 6. the scheduler thread post-processes the executed task.
479515
let scheduler_main_loop = || {
480-
let handler_count = self.handler_count;
516+
let handler_count = self.pool.handler_count;
481517
let session_result_sender = self.session_result_sender.clone();
482518
let new_task_receiver = self.new_task_receiver.clone();
483519

@@ -613,7 +649,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
613649
.unwrap(),
614650
);
615651

616-
self.handler_threads = (0..self.handler_count)
652+
self.handler_threads = (0..self.pool.handler_count)
617653
.map({
618654
|thx| {
619655
thread::Builder::new()
@@ -760,7 +796,7 @@ mod tests {
760796

761797
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
762798
let pool =
763-
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
799+
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
764800

765801
// this indirectly proves that there should be circular link because there's only one Arc
766802
// at this moment now
@@ -775,7 +811,7 @@ mod tests {
775811

776812
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
777813
let pool =
778-
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
814+
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
779815
let bank = Arc::new(Bank::default_for_tests());
780816
let context = SchedulingContext::new(bank);
781817
let scheduler = pool.take_scheduler(context);
@@ -789,7 +825,8 @@ mod tests {
789825
solana_logger::setup();
790826

791827
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
792-
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
828+
let pool =
829+
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
793830
let bank = Arc::new(Bank::default_for_tests());
794831
let context = &SchedulingContext::new(bank);
795832

@@ -817,7 +854,8 @@ mod tests {
817854
solana_logger::setup();
818855

819856
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
820-
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
857+
let pool =
858+
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
821859
let bank = Arc::new(Bank::default_for_tests());
822860
let context = &SchedulingContext::new(bank);
823861
let mut scheduler = pool.do_take_scheduler(context.clone());
@@ -835,7 +873,8 @@ mod tests {
835873
solana_logger::setup();
836874

837875
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
838-
let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache);
876+
let pool =
877+
DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache);
839878
let old_bank = &Arc::new(Bank::default_for_tests());
840879
let new_bank = &Arc::new(Bank::default_for_tests());
841880
assert!(!Arc::ptr_eq(old_bank, new_bank));
@@ -861,7 +900,7 @@ mod tests {
861900
let mut bank_forks = bank_forks.write().unwrap();
862901
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
863902
let pool =
864-
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
903+
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
865904
bank_forks.install_scheduler_pool(pool);
866905
}
867906

@@ -875,7 +914,7 @@ mod tests {
875914

876915
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
877916
let pool =
878-
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
917+
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
879918

880919
let bank = Bank::default_for_tests();
881920
let bank_forks = BankForks::new_rw_arc(bank);
@@ -928,7 +967,7 @@ mod tests {
928967
let bank = setup_dummy_fork_graph(bank);
929968
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
930969
let pool =
931-
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
970+
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
932971
let context = SchedulingContext::new(bank.clone());
933972

934973
assert_eq!(bank.transaction_count(), 0);
@@ -953,7 +992,7 @@ mod tests {
953992

954993
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
955994
let pool =
956-
DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache);
995+
DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache);
957996
let context = SchedulingContext::new(bank.clone());
958997
let mut scheduler = pool.take_scheduler(context);
959998

@@ -1159,6 +1198,7 @@ mod tests {
11591198
None,
11601199
None,
11611200
None,
1201+
None,
11621202
ignored_prioritization_fee_cache,
11631203
);
11641204
let scheduler = pool.take_scheduler(context);
@@ -1193,4 +1233,18 @@ mod tests {
11931233
fn test_scheduler_schedule_execution_recent_blockhash_edge_case_without_race() {
11941234
do_test_scheduler_schedule_execution_recent_blockhash_edge_case::<false>();
11951235
}
1236+
1237+
#[test]
1238+
fn test_default_handler_count() {
1239+
for (detected, expected) in [(32, 8), (4, 1), (2, 1)] {
1240+
assert_eq!(
1241+
DefaultSchedulerPool::calculate_default_handler_count(Some(detected)),
1242+
expected
1243+
);
1244+
}
1245+
assert_eq!(
1246+
DefaultSchedulerPool::calculate_default_handler_count(None),
1247+
4
1248+
);
1249+
}
11961250
}

validator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ solana-streamer = { workspace = true }
6161
solana-svm = { workspace = true }
6262
solana-test-validator = { workspace = true }
6363
solana-tpu-client = { workspace = true }
64+
solana-unified-scheduler-pool = { workspace = true }
6465
solana-version = { workspace = true }
6566
solana-vote-program = { workspace = true }
6667
symlink = { workspace = true }

validator/src/cli.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use {
4747
self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE,
4848
},
4949
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
50+
solana_unified_scheduler_pool::DefaultSchedulerPool,
5051
std::{path::PathBuf, str::FromStr},
5152
};
5253

@@ -1389,6 +1390,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
13891390
.possible_values(BlockProductionMethod::cli_names())
13901391
.help(BlockProductionMethod::cli_message())
13911392
)
1393+
.arg(
1394+
Arg::with_name("unified_scheduler_handler_threads")
1395+
.long("unified-scheduler-handler-threads")
1396+
.hidden(hidden_unless_forced())
1397+
.value_name("THREADS")
1398+
.takes_value(true)
1399+
.validator(|s| is_within_range(s, 1..))
1400+
.help(DefaultSchedulerPool::cli_message()),
1401+
)
13921402
.arg(
13931403
Arg::with_name("wen_restart")
13941404
.long("wen-restart")

validator/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,8 @@ pub fn main() {
16521652
BlockProductionMethod
16531653
)
16541654
.unwrap_or_default();
1655+
validator_config.unified_scheduler_handler_threads =
1656+
value_t!(matches, "unified_scheduler_handler_threads", usize).ok();
16551657

16561658
validator_config.ledger_column_options = LedgerColumnOptions {
16571659
compression_type: match matches.value_of("rocksdb_ledger_compression") {

0 commit comments

Comments
 (0)