Skip to content

Commit cbd27a6

Browse files
akhi3030qkniep
andauthored
feat: Updates timers to send Crashed leader and timeout events (#344)
Part of #243. Rename SkipTimer to TimerManager as now the timers are not just for skip but also for crashed leaders. Refactors the core state machine of sending out different types of events out of the main loop of the timers. Sends out additional crashed leader events on top of skip events. Co-authored-by: Quentin Kniep <[email protected]>
1 parent 095df8c commit cbd27a6

File tree

13 files changed

+392
-196
lines changed

13 files changed

+392
-196
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/tvu.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ impl Tvu {
232232
max_slots.clone(),
233233
Some(rpc_subscriptions.clone()),
234234
slot_status_notifier.clone(),
235+
votor_event_sender.clone(),
235236
);
236237

237238
let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();

turbine/benches/retransmit_stage.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ fn bench_retransmitter(bencher: &mut Bencher) {
120120

121121
let num_packets = data_shreds.len();
122122

123+
let (sender, _) = unbounded();
124+
123125
let retransmit_stage = RetransmitStage::new(
124126
bank_forks,
125127
leader_schedule_cache,
@@ -130,6 +132,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
130132
Arc::new(solana_rpc::max_slots::MaxSlots::default()),
131133
None,
132134
None,
135+
sender,
133136
);
134137

135138
let mut index = 0;

turbine/src/retransmit_stage.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use {
66
cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
77
},
88
bytes::Bytes,
9-
crossbeam_channel::{Receiver, RecvError, TryRecvError},
9+
crossbeam_channel::{Receiver, RecvError, Sender, TryRecvError},
1010
lru::LruCache,
1111
rand::Rng,
1212
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
@@ -32,6 +32,7 @@ use {
3232
sendmmsg::{multi_target_send, SendPktsError},
3333
socket::SocketAddrSpace,
3434
},
35+
solana_votor::event::VotorEvent,
3536
static_assertions::const_assert_eq,
3637
std::{
3738
borrow::Cow,
@@ -224,6 +225,7 @@ fn retransmit(
224225
max_slots: &MaxSlots,
225226
rpc_subscriptions: Option<&RpcSubscriptions>,
226227
slot_status_notifier: Option<&SlotStatusNotifier>,
228+
votor_event_sender: &Sender<VotorEvent>,
227229
) -> Result<(), RecvError> {
228230
// Try to receive shreds from the channel without blocking. If the channel
229231
// is empty precompute turbine trees speculatively. If no cache updates are
@@ -340,6 +342,7 @@ fn retransmit(
340342
addr_cache,
341343
rpc_subscriptions,
342344
slot_status_notifier,
345+
votor_event_sender,
343346
);
344347
timer_start.stop();
345348
stats.total_time += timer_start.as_us();
@@ -522,6 +525,7 @@ impl RetransmitStage {
522525
/// * `leader_schedule_cache` - The leader schedule to verify shreds
523526
/// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip.
524527
/// * `retransmit_receiver` - Receive channel for batches of shreds to be retransmitted.
528+
#[allow(clippy::too_many_arguments)]
525529
pub fn new(
526530
bank_forks: Arc<RwLock<BankForks>>,
527531
leader_schedule_cache: Arc<LeaderScheduleCache>,
@@ -532,6 +536,7 @@ impl RetransmitStage {
532536
max_slots: Arc<MaxSlots>,
533537
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
534538
slot_status_notifier: Option<SlotStatusNotifier>,
539+
votor_event_sender: Sender<VotorEvent>,
535540
) -> Self {
536541
let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
537542
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
@@ -571,6 +576,7 @@ impl RetransmitStage {
571576
&max_slots,
572577
rpc_subscriptions.as_deref(),
573578
slot_status_notifier.as_ref(),
579+
&votor_event_sender,
574580
)
575581
.is_ok()
576582
{}
@@ -652,6 +658,7 @@ impl RetransmitStats {
652658
addr_cache: &mut AddrCache,
653659
rpc_subscriptions: Option<&RpcSubscriptions>,
654660
slot_status_notifier: Option<&SlotStatusNotifier>,
661+
votor_event_sender: &Sender<VotorEvent>,
655662
) {
656663
for (slot, mut slot_stats) in feed {
657664
addr_cache.record(slot, &mut slot_stats);
@@ -663,6 +670,7 @@ impl RetransmitStats {
663670
slot_stats.outset,
664671
rpc_subscriptions,
665672
slot_status_notifier,
673+
votor_event_sender,
666674
);
667675
}
668676
self.slot_stats.put(slot, slot_stats);
@@ -756,6 +764,7 @@ fn notify_subscribers(
756764
timestamp: u64, // When the first shred in the slot was received.
757765
rpc_subscriptions: Option<&RpcSubscriptions>,
758766
slot_status_notifier: Option<&SlotStatusNotifier>,
767+
votor_event_sender: &Sender<VotorEvent>,
759768
) {
760769
if let Some(rpc_subscriptions) = rpc_subscriptions {
761770
let slot_update = SlotUpdate::FirstShredReceived { slot, timestamp };
@@ -768,6 +777,9 @@ fn notify_subscribers(
768777
.unwrap()
769778
.notify_first_shred_received(slot);
770779
}
780+
votor_event_sender
781+
.send(VotorEvent::FirstShred(slot))
782+
.unwrap();
771783
}
772784

773785
#[cfg(test)]

votor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dashmap = { workspace = true, features = ["rayon", "raw-api"] }
3434
etcd-client = { workspace = true, features = ["tls"] }
3535
itertools = { workspace = true }
3636
log = { workspace = true }
37+
parking_lot = { workspace = true }
3738
qualifier_attr = { workspace = true }
3839
rayon = { workspace = true }
3940
serde = { workspace = true }

votor/src/event.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,17 @@ pub enum VotorEvent {
3535
/// The block has received a notarization certificate
3636
BlockNotarized(Block),
3737

38+
/// Received the first shred for the slot.
39+
FirstShred(Slot),
40+
3841
/// The pool has marked the given block as a ready parent for `slot`
3942
ParentReady { slot: Slot, parent_block: Block },
4043

41-
/// The skip timer has fired for the given slot
44+
//// Timeout to early detect that a honest that has crashed and
45+
/// if the leader window should be skipped.
46+
TimeoutCrashedLeader(Slot),
47+
48+
/// Timeout to inspect whether the remaining leader window should be skipped.
4249
Timeout(Slot),
4350

4451
/// The given block has reached the safe to notar status
@@ -67,16 +74,18 @@ impl VotorEvent {
6774
pub(crate) fn should_ignore(&self, root: Slot) -> bool {
6875
match self {
6976
VotorEvent::Block(completed_block) => completed_block.slot <= root,
70-
VotorEvent::BlockNotarized((s, _)) => *s <= root,
71-
VotorEvent::ParentReady {
72-
slot,
77+
VotorEvent::Timeout(s)
78+
| VotorEvent::SafeToSkip(s)
79+
| VotorEvent::TimeoutCrashedLeader(s)
80+
| VotorEvent::FirstShred(s)
81+
| VotorEvent::SafeToNotar((s, _))
82+
| VotorEvent::Finalized((s, _))
83+
| VotorEvent::BlockNotarized((s, _))
84+
| VotorEvent::ParentReady {
85+
slot: s,
7386
parent_block: _,
74-
} => *slot <= root,
75-
VotorEvent::Timeout(s) => *s <= root,
76-
VotorEvent::SafeToNotar((s, _)) => *s <= root,
77-
VotorEvent::SafeToSkip(s) => *s <= root,
87+
} => s <= &root,
7888
VotorEvent::ProduceWindow(_) => false,
79-
VotorEvent::Finalized((s, _)) => *s <= root,
8089
VotorEvent::Standstill(_) => false,
8190
VotorEvent::SetIdentity => false,
8291
}

votor/src/event_handler.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ use {
66
commitment::{alpenglow_update_commitment_cache, AlpenglowCommitmentType},
77
event::{CompletedBlock, VotorEvent, VotorEventReceiver},
88
root_utils::{self, RootContext},
9-
skip_timer::SkipTimerManager,
9+
timer_manager::TimerManager,
1010
vote_history::{VoteHistory, VoteHistoryError},
1111
voting_utils::{self, BLSOp, VoteError, VotingContext},
1212
votor::{SharedContext, Votor},
1313
},
1414
crossbeam_channel::{select, RecvError, SendError},
15+
parking_lot::RwLock,
1516
solana_ledger::leader_schedule_utils::{
1617
first_of_consecutive_leader_slots, last_of_consecutive_leader_slots, leader_slot_index,
1718
},
@@ -23,7 +24,7 @@ use {
2324
collections::{BTreeMap, BTreeSet},
2425
sync::{
2526
atomic::{AtomicBool, Ordering},
26-
Arc, Condvar, Mutex, RwLock,
27+
Arc, Condvar, Mutex,
2728
},
2829
thread::{self, Builder, JoinHandle},
2930
time::Duration,
@@ -41,7 +42,7 @@ pub(crate) struct EventHandlerContext {
4142
pub(crate) start: Arc<(Mutex<bool>, Condvar)>,
4243

4344
pub(crate) event_receiver: VotorEventReceiver,
44-
pub(crate) skip_timer: Arc<RwLock<SkipTimerManager>>,
45+
pub(crate) timer_manager: Arc<RwLock<TimerManager>>,
4546

4647
// Contexts
4748
pub(crate) shared_context: SharedContext,
@@ -92,14 +93,15 @@ impl EventHandler {
9293
exit,
9394
start,
9495
event_receiver,
95-
skip_timer,
96+
timer_manager,
9697
shared_context: ctx,
9798
voting_context: mut vctx,
9899
root_context: rctx,
99100
} = context;
100101
let mut my_pubkey = vctx.identity_keypair.pubkey();
101102
let mut pending_blocks = PendingBlocks::default();
102103
let mut finalized_blocks = BTreeSet::default();
104+
let mut received_shred = BTreeSet::default();
103105

104106
// Wait until migration has completed
105107
info!("{my_pubkey}: Event loop initialized");
@@ -137,12 +139,13 @@ impl EventHandler {
137139
let votes = Self::handle_event(
138140
&mut my_pubkey,
139141
event,
140-
&skip_timer,
142+
&timer_manager,
141143
&ctx,
142144
&mut vctx,
143145
&rctx,
144146
&mut pending_blocks,
145147
&mut finalized_blocks,
148+
&mut received_shred,
146149
)?;
147150

148151
// TODO: properly bubble up error handling here and in call graph
@@ -157,12 +160,13 @@ impl EventHandler {
157160
fn handle_event(
158161
my_pubkey: &mut Pubkey,
159162
event: VotorEvent,
160-
skip_timer: &RwLock<SkipTimerManager>,
163+
timer_manager: &RwLock<TimerManager>,
161164
ctx: &SharedContext,
162165
vctx: &mut VotingContext,
163166
rctx: &RootContext,
164167
pending_blocks: &mut PendingBlocks,
165168
finalized_blocks: &mut BTreeSet<Block>,
169+
received_shred: &mut BTreeSet<Slot>,
166170
) -> Result<Vec<Result<BLSOp, VoteError>>, EventLoopError> {
167171
let mut votes = vec![];
168172
match event {
@@ -193,6 +197,7 @@ impl EventHandler {
193197
rctx,
194198
pending_blocks,
195199
finalized_blocks,
200+
received_shred,
196201
)?;
197202
}
198203

@@ -203,16 +208,29 @@ impl EventHandler {
203208
Self::try_final(my_pubkey, block, vctx, &mut votes);
204209
}
205210

211+
VotorEvent::FirstShred(slot) => {
212+
info!("{my_pubkey}: First shred {slot}");
213+
received_shred.insert(slot);
214+
}
215+
206216
// Received a parent ready notification for `slot`
207217
VotorEvent::ParentReady { slot, parent_block } => {
208218
info!("{my_pubkey}: Parent ready {slot} {parent_block:?}");
209219
let should_set_timeouts = vctx.vote_history.add_parent_ready(slot, parent_block);
210220
Self::check_pending_blocks(my_pubkey, pending_blocks, vctx, &mut votes);
211221
if should_set_timeouts {
212-
skip_timer.write().unwrap().set_timeouts(slot);
222+
timer_manager.write().set_timeouts(slot);
213223
}
214224
}
215225

226+
VotorEvent::TimeoutCrashedLeader(slot) => {
227+
info!("{my_pubkey}: TimeoutCrashedLeader {slot}");
228+
if vctx.vote_history.voted(slot) || received_shred.contains(&slot) {
229+
return Ok(votes);
230+
}
231+
Self::try_skip_window(my_pubkey, slot, vctx, &mut votes);
232+
}
233+
216234
// Skip timer for the slot has fired
217235
VotorEvent::Timeout(slot) => {
218236
info!("{my_pubkey}: Timeout {slot}");
@@ -287,6 +305,7 @@ impl EventHandler {
287305
rctx,
288306
pending_blocks,
289307
finalized_blocks,
308+
received_shred,
290309
)?;
291310
}
292311

@@ -536,6 +555,7 @@ impl EventHandler {
536555
rctx: &RootContext,
537556
pending_blocks: &mut PendingBlocks,
538557
finalized_blocks: &mut BTreeSet<Block>,
558+
received_shred: &mut BTreeSet<Slot>,
539559
) -> Result<(), SetRootError> {
540560
let bank_forks_r = ctx.bank_forks.read().unwrap();
541561
let old_root = bank_forks_r.root();
@@ -563,6 +583,7 @@ impl EventHandler {
563583
rctx,
564584
pending_blocks,
565585
finalized_blocks,
586+
received_shred,
566587
)
567588
}
568589

votor/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub mod commitment;
1010
pub mod event;
1111
mod event_handler;
1212
pub mod root_utils;
13-
mod skip_timer;
13+
mod timer_manager;
1414
pub mod vote_history;
1515
pub mod vote_history_storage;
1616
pub mod voting_utils;

votor/src/root_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ pub(crate) fn set_root(
4040
rctx: &RootContext,
4141
pending_blocks: &mut PendingBlocks,
4242
finalized_blocks: &mut BTreeSet<Block>,
43+
received_shred: &mut BTreeSet<Slot>,
4344
) -> Result<(), SetRootError> {
4445
info!("{my_pubkey}: setting root {new_root}");
4546
vctx.vote_history.set_root(new_root);
4647
*pending_blocks = pending_blocks.split_off(&new_root);
4748
*finalized_blocks = finalized_blocks.split_off(&(new_root, Hash::default()));
49+
*received_shred = received_shred.split_off(&new_root);
4850

4951
check_and_handle_new_root(
5052
new_root,

0 commit comments

Comments
 (0)