-
Notifications
You must be signed in to change notification settings - Fork 5.1k
Introduce SchedulingStateMachine for unified scheduler [NO-MERGE&REVIEW-ONLY-MODE] #35286
Changes from 30 commits
a5503af
324b2d0
8d8eb3d
a6022cc
a54bc7f
e0415ea
446a2ff
7a72de8
92a9ba4
62e5a38
192f0f5
efee73c
05d9e40
b72fda0
745f4d9
d99d06b
8ceb1b5
c62e835
4c703a9
1d4b087
eb87f1d
03a0bd1
fe2efa2
16ac5b3
737e473
4fcb360
a0baa10
2cbf64a
3b9fb2c
d072efd
001b10e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,8 @@ | ||
//! NOTE: While the unified scheduler is fully functional and moderately performant even with | ||
//! mainnet-beta, it has known resource-exhaustion related security issues for replaying | ||
//! specially-crafted blocks produced by malicious leaders. Thus, this experimental and | ||
//! nondefault functionality is exempt from the bug bounty program for now. | ||
//! | ||
//! Transaction scheduling code. | ||
//! | ||
//! This crate implements 3 solana-runtime traits (`InstalledScheduler`, `UninstalledScheduler` and | ||
|
@@ -10,7 +15,8 @@ | |
|
||
use { | ||
assert_matches::assert_matches, | ||
crossbeam_channel::{select, unbounded, Receiver, SendError, Sender}, | ||
crossbeam_channel::{never, select, unbounded, Receiver, RecvError, SendError, Sender}, | ||
dashmap::DashMap, | ||
derivative::Derivative, | ||
log::*, | ||
solana_ledger::blockstore_processor::{ | ||
|
@@ -26,8 +32,11 @@ use { | |
}, | ||
prioritization_fee_cache::PrioritizationFeeCache, | ||
}, | ||
solana_sdk::transaction::{Result, SanitizedTransaction}, | ||
solana_unified_scheduler_logic::Task, | ||
solana_sdk::{ | ||
pubkey::Pubkey, | ||
transaction::{Result, SanitizedTransaction}, | ||
}, | ||
solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue}, | ||
solana_vote::vote_sender_types::ReplayVoteSender, | ||
std::{ | ||
fmt::Debug, | ||
|
@@ -90,10 +99,8 @@ where | |
replay_vote_sender: Option<ReplayVoteSender>, | ||
prioritization_fee_cache: Arc<PrioritizationFeeCache>, | ||
) -> Arc<Self> { | ||
let handler_count = handler_count.unwrap_or(1); | ||
// we're hard-coding the number of handler thread to 1, meaning this impl is currently | ||
// single-threaded still. | ||
assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later | ||
let handler_count = handler_count.unwrap_or(Self::default_handler_count()); | ||
assert!(handler_count >= 1); | ||
|
||
Arc::new_cyclic(|weak_self| Self { | ||
scheduler_inners: Mutex::default(), | ||
|
@@ -386,13 +393,35 @@ mod chained_channel { | |
} | ||
} | ||
|
||
/// The primary owner of all [`UsageQueue`]s used for particular [`PooledScheduler`]. | ||
/// | ||
/// Currently, the simplest implementation. This grows memory usage in unbounded way. Cleaning will | ||
/// be added later. This struct is here to be put outside `solana-unified-scheduler-logic` for the | ||
/// crate's original intent (separation of logics from this crate). Some practical and mundane | ||
/// pruning will be implemented in this type. | ||
#[derive(Default, Debug)] | ||
pub struct UsageQueueLoader { | ||
usage_queues: DashMap<Pubkey, UsageQueue>, | ||
} | ||
|
||
impl UsageQueueLoader { | ||
pub fn load(&self, address: Pubkey) -> UsageQueue { | ||
self.usage_queues.entry(address).or_default().clone() | ||
} | ||
} | ||
|
||
// (this is slow needing atomic mem reads. However, this can be turned into a lot faster | ||
// optimizer-friendly version as shown in this crossbeam pr: | ||
// https://github.com/crossbeam-rs/crossbeam/pull/1047) | ||
fn disconnected<T>() -> Receiver<T> { | ||
// drop the sender residing at .0, returning an always-disconnected receiver. | ||
unbounded().1 | ||
} | ||
|
||
fn initialized_result_with_timings() -> ResultWithTimings { | ||
(Ok(()), ExecuteTimings::default()) | ||
} | ||
|
||
// Currently, simplest possible implementation (i.e. single-threaded) | ||
// this will be replaced with more proper implementation... | ||
// not usable at all, especially for mainnet-beta | ||
#[derive(Debug)] | ||
pub struct PooledScheduler<TH: TaskHandler> { | ||
inner: PooledSchedulerInner<Self, TH>, | ||
|
@@ -402,6 +431,7 @@ pub struct PooledScheduler<TH: TaskHandler> { | |
#[derive(Debug)] | ||
pub struct PooledSchedulerInner<S: SpawnableScheduler<TH>, TH: TaskHandler> { | ||
thread_manager: ThreadManager<S, TH>, | ||
usage_queue_loader: UsageQueueLoader, | ||
} | ||
|
||
// This type manages the OS threads for scheduling and executing transactions. The term | ||
|
@@ -427,6 +457,7 @@ impl<TH: TaskHandler> PooledScheduler<TH> { | |
Self::from_inner( | ||
PooledSchedulerInner::<Self, TH> { | ||
thread_manager: ThreadManager::new(pool), | ||
usage_queue_loader: UsageQueueLoader::default(), | ||
}, | ||
initial_context, | ||
) | ||
|
@@ -518,7 +549,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> { | |
let new_task_receiver = self.new_task_receiver.clone(); | ||
|
||
let mut session_ending = false; | ||
let mut active_task_count: usize = 0; | ||
|
||
// Now, this is the main loop for the scheduler thread, which is a special beast. | ||
// | ||
|
@@ -558,61 +588,97 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> { | |
// cycles out of the scheduler thread. Thus, any kinds of unessential overhead sources | ||
// like syscalls, VDSO, and even memory (de)allocation should be avoided at all costs | ||
// by design or by means of offloading at the last resort. | ||
move || loop { | ||
let mut is_finished = false; | ||
while !is_finished { | ||
select! { | ||
recv(finished_task_receiver) -> executed_task => { | ||
let executed_task = executed_task.unwrap(); | ||
|
||
active_task_count = active_task_count.checked_sub(1).unwrap(); | ||
let result_with_timings = result_with_timings.as_mut().unwrap(); | ||
Self::accumulate_result_with_timings(result_with_timings, executed_task); | ||
}, | ||
recv(new_task_receiver) -> message => { | ||
assert!(!session_ending); | ||
|
||
match message.unwrap() { | ||
NewTaskPayload::Payload(task) => { | ||
// so, we're NOT scheduling at all here; rather, just execute | ||
// tx straight off. the inter-tx locking deps aren't needed to | ||
// be resolved in the case of single-threaded FIFO like this. | ||
runnable_task_sender | ||
.send_payload(task) | ||
.unwrap(); | ||
active_task_count = active_task_count.checked_add(1).unwrap(); | ||
} | ||
NewTaskPayload::OpenSubchannel(context) => { | ||
// signal about new SchedulingContext to handler threads | ||
runnable_task_sender | ||
.send_chained_channel(context, handler_count) | ||
.unwrap(); | ||
assert_matches!( | ||
result_with_timings.replace(initialized_result_with_timings()), | ||
None | ||
); | ||
} | ||
NewTaskPayload::CloseSubchannel => { | ||
session_ending = true; | ||
} | ||
} | ||
}, | ||
}; | ||
move || { | ||
let (do_now, dont_now) = (&disconnected::<()>(), &never::<()>()); | ||
let dummy_receiver = |trigger| { | ||
if trigger { | ||
do_now | ||
} else { | ||
dont_now | ||
} | ||
}; | ||
|
||
// a really simplistic termination condition, which only works under the | ||
// assumption of single handler thread... | ||
is_finished = session_ending && active_task_count == 0; | ||
} | ||
let mut state_machine = unsafe { | ||
SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() | ||
}; | ||
|
||
if session_ending { | ||
session_result_sender | ||
.send(Some( | ||
result_with_timings | ||
.take() | ||
.unwrap_or_else(initialized_result_with_timings), | ||
)) | ||
.unwrap(); | ||
session_ending = false; | ||
loop { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We never exit this loop except for panic? I think we may have already talked about this in a previous PR, and you're just planning on error-handling in a follow-up. Please correct me if I'm wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This understanding is completely aligns with what i said previously. The proper scheduler management code is complicated, flavored to my own taste (expecting back-and-force review session), (i.e. yet another beast by itself). so, i wanted to focus on the logic with this pr. |
||
let mut is_finished = false; | ||
while !is_finished { | ||
// ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl, | ||
// which isn't great and is inconsistent with `if`s in the Rust's match | ||
// arm. So, eagerly binding the result to a variable unconditionally here | ||
// makes no perf. difference... | ||
let dummy_unblocked_task_receiver = | ||
dummy_receiver(state_machine.has_unblocked_task()); | ||
|
||
// (Assume this is biased; i.e. select_biased! in this crossbeam pr: | ||
// https://github.com/rust-lang/futures-rs/pull/1976) | ||
// | ||
// There's something special called dummy_unblocked_task_receiver here. | ||
// This odd pattern was needed to react to newly unblocked tasks from | ||
// _not-crossbeam-channel_ event sources, precisely at the specified | ||
// precedence among other selectors, while delegating the conrol flow to | ||
// select_biased!. | ||
// | ||
// In this way, hot looping is avoided and overall control flow is much | ||
// consistent. Note that unified scheduler will go | ||
// into busy looping to seek lowest latency eventually. However, not now, | ||
// to measure _actual_ cpu usage easily with the select approach. | ||
select! { | ||
recv(finished_task_receiver) -> executed_task => { | ||
let executed_task = executed_task.unwrap(); | ||
|
||
state_machine.deschedule_task(&executed_task.task); | ||
let result_with_timings = result_with_timings.as_mut().unwrap(); | ||
Self::accumulate_result_with_timings(result_with_timings, executed_task); | ||
}, | ||
recv(dummy_unblocked_task_receiver) -> dummy => { | ||
assert_matches!(dummy, Err(RecvError)); | ||
|
||
let task = state_machine.schedule_unblocked_task().expect("unblocked task"); | ||
runnable_task_sender.send_payload(task).unwrap(); | ||
}, | ||
recv(new_task_receiver) -> message => { | ||
assert!(!session_ending); | ||
|
||
match message.unwrap() { | ||
NewTaskPayload::Payload(task) => { | ||
if let Some(task) = state_machine.schedule_task(task) { | ||
runnable_task_sender.send_payload(task).unwrap(); | ||
} | ||
} | ||
NewTaskPayload::OpenSubchannel(context) => { | ||
// signal about new SchedulingContext to handler threads | ||
runnable_task_sender | ||
.send_chained_channel(context, handler_count) | ||
.unwrap(); | ||
assert_matches!( | ||
result_with_timings.replace(initialized_result_with_timings()), | ||
None | ||
); | ||
} | ||
NewTaskPayload::CloseSubchannel => { | ||
session_ending = true; | ||
} | ||
} | ||
}, | ||
}; | ||
|
||
is_finished = session_ending && state_machine.has_no_active_task(); | ||
} | ||
|
||
if session_ending { | ||
state_machine.reinitialize(); | ||
session_result_sender | ||
.send(Some( | ||
result_with_timings | ||
.take() | ||
.unwrap_or_else(initialized_result_with_timings), | ||
)) | ||
.unwrap(); | ||
session_ending = false; | ||
} | ||
} | ||
} | ||
}; | ||
|
@@ -741,7 +807,9 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> { | |
} | ||
|
||
fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { | ||
let task = Task::create_task(transaction.clone(), index); | ||
let task = SchedulingStateMachine::create_task(transaction.clone(), index, &mut |pubkey| { | ||
self.inner.usage_queue_loader.load(pubkey) | ||
}); | ||
self.inner.thread_manager.send_task(task); | ||
} | ||
|
||
|
@@ -1023,7 +1091,7 @@ mod tests { | |
.result, | ||
Ok(_) | ||
); | ||
scheduler.schedule_execution(&(good_tx_after_bad_tx, 0)); | ||
scheduler.schedule_execution(&(good_tx_after_bad_tx, 1)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is due to task_index is started to be |
||
scheduler.pause_for_recent_blockhash(); | ||
// transaction_count should remain same as scheduler should be bailing out. | ||
// That's because we're testing the serialized failing execution case in this test. | ||
|
@@ -1247,4 +1315,42 @@ mod tests { | |
4 | ||
); | ||
} | ||
|
||
// See comment in SchedulingStateMachine::create_task() for the justification of this test | ||
#[test] | ||
fn test_enfoced_get_account_locks_validation() { | ||
solana_logger::setup(); | ||
|
||
let GenesisConfigInfo { | ||
genesis_config, | ||
ref mint_keypair, | ||
.. | ||
} = create_genesis_config(10_000); | ||
let bank = Bank::new_for_tests(&genesis_config); | ||
let bank = &setup_dummy_fork_graph(bank); | ||
|
||
let mut tx = system_transaction::transfer( | ||
mint_keypair, | ||
&solana_sdk::pubkey::new_rand(), | ||
2, | ||
genesis_config.hash(), | ||
); | ||
// mangle the transfer tx to try to lock fee_payer (= mint_keypair) address twice! | ||
tx.message.account_keys.push(tx.message.account_keys[0]); | ||
let tx = &SanitizedTransaction::from_transaction_for_tests(tx); | ||
|
||
// this internally should call SanitizedTransaction::get_account_locks(). | ||
let result = &mut Ok(()); | ||
let timings = &mut ExecuteTimings::default(); | ||
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); | ||
let handler_context = &HandlerContext { | ||
log_messages_bytes_limit: None, | ||
transaction_status_sender: None, | ||
replay_vote_sender: None, | ||
prioritization_fee_cache, | ||
}; | ||
|
||
DefaultTaskHandler::handle(result, timings, bank, tx, 0, handler_context); | ||
assert_matches!(result, Err(TransactionError::AccountLoadedTwice)); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.