Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 950ca5e

Browse files
authored
Add InstalledScheduler for blockstore_processor (#33875)
* Add InstalledScheduler for blockstore_processor * Reverse if clauses * Add more comments for process_batches() * Elaborate comment * Simplify schedule_transaction_executions type
1 parent d04ad65 commit 950ca5e

File tree

6 files changed

+329
-41
lines changed

6 files changed

+329
-41
lines changed

Cargo.lock

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ memmap2 = "0.5.10"
243243
memoffset = "0.9"
244244
merlin = "3"
245245
min-max-heap = "1.3.0"
246+
mockall = "0.11.4"
246247
modular-bitfield = "0.11.2"
247248
nix = "0.26.4"
248249
num-bigint = "0.4.4"

ledger/src/blockstore_processor.rs

Lines changed: 146 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,70 @@ fn execute_batches_internal(
294294
})
295295
}
296296

297+
// This fn diverts the code-path into two variants. Both must provide exactly the same set of
298+
// validations. For this reason, this fn is deliberately inserted into the code path to be called
299+
// inside process_entries(), so that Bank::prepare_sanitized_batch() has been called on all of
300+
// batches already, while minimizing code duplication (thus divergent behavior risk) at the cost of
301+
// acceptable overhead of meaningless buffering of batches for the scheduler variant.
302+
//
303+
// Also note that the scheduler variant can't implement the batch-level sanitization naively, due
304+
// to the nature of individual tx processing. That's another reason of this particular placement of
305+
// divergent point in the code-path (i.e. not one layer up with its own prepare_sanitized_batch()
306+
// invocation).
307+
fn process_batches(
308+
bank: &BankWithScheduler,
309+
batches: &[TransactionBatchWithIndexes],
310+
transaction_status_sender: Option<&TransactionStatusSender>,
311+
replay_vote_sender: Option<&ReplayVoteSender>,
312+
batch_execution_timing: &mut BatchExecutionTiming,
313+
log_messages_bytes_limit: Option<usize>,
314+
prioritization_fee_cache: &PrioritizationFeeCache,
315+
) -> Result<()> {
316+
if bank.has_installed_scheduler() {
317+
debug!(
318+
"process_batches()/schedule_batches_for_execution({} batches)",
319+
batches.len()
320+
);
321+
// scheduling always succeeds here without being blocked on actual transaction executions.
322+
// The transaction execution errors will be collected via the blocking fn called
323+
// BankWithScheduler::wait_for_completed_scheduler(), if any.
324+
schedule_batches_for_execution(bank, batches);
325+
Ok(())
326+
} else {
327+
debug!(
328+
"process_batches()/rebatch_and_execute_batches({} batches)",
329+
batches.len()
330+
);
331+
rebatch_and_execute_batches(
332+
bank,
333+
batches,
334+
transaction_status_sender,
335+
replay_vote_sender,
336+
batch_execution_timing,
337+
log_messages_bytes_limit,
338+
prioritization_fee_cache,
339+
)
340+
}
341+
}
342+
343+
fn schedule_batches_for_execution(
344+
bank: &BankWithScheduler,
345+
batches: &[TransactionBatchWithIndexes],
346+
) {
347+
for TransactionBatchWithIndexes {
348+
batch,
349+
transaction_indexes,
350+
} in batches
351+
{
352+
bank.schedule_transaction_executions(
353+
batch
354+
.sanitized_transactions()
355+
.iter()
356+
.zip(transaction_indexes.iter()),
357+
);
358+
}
359+
}
360+
297361
fn rebatch_transactions<'a>(
298362
lock_results: &'a [Result<()>],
299363
bank: &'a Arc<Bank>,
@@ -314,7 +378,7 @@ fn rebatch_transactions<'a>(
314378
}
315379
}
316380

317-
fn execute_batches(
381+
fn rebatch_and_execute_batches(
318382
bank: &Arc<Bank>,
319383
batches: &[TransactionBatchWithIndexes],
320384
transaction_status_sender: Option<&TransactionStatusSender>,
@@ -488,7 +552,7 @@ fn process_entries(
488552
if bank.is_block_boundary(bank.tick_height() + tick_hashes.len() as u64) {
489553
// If it's a tick that will cause a new blockhash to be created,
490554
// execute the group and register the tick
491-
execute_batches(
555+
process_batches(
492556
bank,
493557
&batches,
494558
transaction_status_sender,
@@ -541,7 +605,7 @@ fn process_entries(
541605
} else {
542606
// else we have an entry that conflicts with a prior entry
543607
// execute the current queue and try to process this entry again
544-
execute_batches(
608+
process_batches(
545609
bank,
546610
&batches,
547611
transaction_status_sender,
@@ -556,7 +620,7 @@ fn process_entries(
556620
}
557621
}
558622
}
559-
execute_batches(
623+
process_batches(
560624
bank,
561625
&batches,
562626
transaction_status_sender,
@@ -1856,8 +1920,11 @@ pub mod tests {
18561920
rand::{thread_rng, Rng},
18571921
solana_entry::entry::{create_ticks, next_entry, next_entry_mut},
18581922
solana_program_runtime::declare_process_instruction,
1859-
solana_runtime::genesis_utils::{
1860-
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
1923+
solana_runtime::{
1924+
genesis_utils::{
1925+
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
1926+
},
1927+
installed_scheduler_pool::MockInstalledScheduler,
18611928
},
18621929
solana_sdk::{
18631930
account::{AccountSharedData, WritableAccount},
@@ -4245,6 +4312,38 @@ pub mod tests {
42454312
)
42464313
}
42474314

4315+
fn create_test_transactions(
4316+
mint_keypair: &Keypair,
4317+
genesis_hash: &Hash,
4318+
) -> Vec<SanitizedTransaction> {
4319+
let pubkey = solana_sdk::pubkey::new_rand();
4320+
let keypair2 = Keypair::new();
4321+
let pubkey2 = solana_sdk::pubkey::new_rand();
4322+
let keypair3 = Keypair::new();
4323+
let pubkey3 = solana_sdk::pubkey::new_rand();
4324+
4325+
vec![
4326+
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
4327+
mint_keypair,
4328+
&pubkey,
4329+
1,
4330+
*genesis_hash,
4331+
)),
4332+
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
4333+
&keypair2,
4334+
&pubkey2,
4335+
1,
4336+
*genesis_hash,
4337+
)),
4338+
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
4339+
&keypair3,
4340+
&pubkey3,
4341+
1,
4342+
*genesis_hash,
4343+
)),
4344+
]
4345+
}
4346+
42484347
#[test]
42494348
fn test_confirm_slot_entries_progress_num_txs_indexes() {
42504349
let GenesisConfigInfo {
@@ -4368,34 +4467,7 @@ pub mod tests {
43684467
..
43694468
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
43704469
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
4371-
4372-
let pubkey = solana_sdk::pubkey::new_rand();
4373-
let keypair2 = Keypair::new();
4374-
let pubkey2 = solana_sdk::pubkey::new_rand();
4375-
let keypair3 = Keypair::new();
4376-
let pubkey3 = solana_sdk::pubkey::new_rand();
4377-
4378-
let txs = vec![
4379-
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
4380-
&mint_keypair,
4381-
&pubkey,
4382-
1,
4383-
genesis_config.hash(),
4384-
)),
4385-
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
4386-
&keypair2,
4387-
&pubkey2,
4388-
1,
4389-
genesis_config.hash(),
4390-
)),
4391-
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
4392-
&keypair3,
4393-
&pubkey3,
4394-
1,
4395-
genesis_config.hash(),
4396-
)),
4397-
];
4398-
4470+
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
43994471
let batch = bank.prepare_sanitized_batch(&txs);
44004472
assert!(batch.needs_unlock());
44014473
let transaction_indexes = vec![42, 43, 44];
@@ -4424,6 +4496,46 @@ pub mod tests {
44244496
assert_eq!(batch3.transaction_indexes, vec![43, 44]);
44254497
}
44264498

4499+
#[test]
4500+
fn test_schedule_batches_for_execution() {
4501+
solana_logger::setup();
4502+
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
4503+
let GenesisConfigInfo {
4504+
genesis_config,
4505+
mint_keypair,
4506+
..
4507+
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
4508+
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
4509+
4510+
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
4511+
4512+
let mut mocked_scheduler = MockInstalledScheduler::new();
4513+
mocked_scheduler
4514+
.expect_schedule_execution()
4515+
.times(txs.len())
4516+
.returning(|_| ());
4517+
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));
4518+
4519+
let batch = bank.prepare_sanitized_batch(&txs);
4520+
let batch_with_indexes = TransactionBatchWithIndexes {
4521+
batch,
4522+
transaction_indexes: (0..txs.len()).collect(),
4523+
};
4524+
4525+
let mut batch_execution_timing = BatchExecutionTiming::default();
4526+
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
4527+
assert!(process_batches(
4528+
&bank,
4529+
&[batch_with_indexes],
4530+
None,
4531+
None,
4532+
&mut batch_execution_timing,
4533+
None,
4534+
&ignored_prioritization_fee_cache
4535+
)
4536+
.is_ok());
4537+
}
4538+
44274539
#[test]
44284540
fn test_confirm_slot_entries_with_fix() {
44294541
const HASHES_PER_TICK: u64 = 10;

0 commit comments

Comments
 (0)