Skip to content

Commit 8e31adf

Browse files
akhi3030qkniep
authored andcommitted
feat: Updates timers to send Crashed leader and timeout events (#344)
Part of #243. Rename SkipTimer to TimerManager as now the timers are not just for skip but also for crashed leaders. Refactors the core state machine of sending out different types of events out of the main loop of the timers. Sends out additional crashed leader events on top of skip events. Co-authored-by: Quentin Kniep <[email protected]>
1 parent e395a4a commit 8e31adf

File tree

11 files changed

+410
-49
lines changed

11 files changed

+410
-49
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/tvu.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ impl Tvu {
247247
rpc_subscriptions.clone(),
248248
slot_status_notifier.clone(),
249249
tvu_config.xdp_sender,
250+
votor_event_sender.clone(),
250251
);
251252

252253
let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();

turbine/src/retransmit_stage.rs

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use {
77
xdp::{XdpSender, XdpShredPayload},
88
},
99
bytes::Bytes,
10-
crossbeam_channel::{Receiver, RecvError, TryRecvError},
10+
crossbeam_channel::{Receiver, RecvError, Sender, TryRecvError},
1111
lru::LruCache,
1212
rand::Rng,
1313
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
@@ -34,6 +34,7 @@ use {
3434
socket::SocketAddrSpace,
3535
},
3636
solana_time_utils::timestamp,
37+
solana_votor::event::VotorEvent,
3738
static_assertions::const_assert_eq,
3839
std::{
3940
borrow::Cow,
@@ -242,6 +243,7 @@ fn retransmit(
242243
rpc_subscriptions: Option<&RpcSubscriptions>,
243244
slot_status_notifier: Option<&SlotStatusNotifier>,
244245
shred_buf: &mut Vec<Vec<shred::Payload>>,
246+
votor_event_sender: &Sender<VotorEvent>,
245247
) -> Result<(), RecvError> {
246248
// Try to receive shreds from the channel without blocking. If the channel
247249
// is empty precompute turbine trees speculatively. If no cache updates are
@@ -380,6 +382,7 @@ fn retransmit(
380382
addr_cache,
381383
rpc_subscriptions,
382384
slot_status_notifier,
385+
votor_event_sender,
383386
);
384387
timer_start.stop();
385388
stats.total_time += timer_start.as_us();
@@ -593,6 +596,7 @@ impl RetransmitStage {
593596
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
594597
slot_status_notifier: Option<SlotStatusNotifier>,
595598
xdp_sender: Option<XdpSender>,
599+
votor_event_sender: Sender<VotorEvent>,
596600
) -> Self {
597601
let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
598602
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
@@ -614,30 +618,29 @@ impl RetransmitStage {
614618

615619
let retransmit_thread_handle = Builder::new()
616620
.name("solRetransmittr".to_string())
617-
.spawn({
618-
move || {
619-
let mut shred_buf = Vec::with_capacity(RETRANSMIT_BATCH_SIZE);
620-
while retransmit(
621-
&thread_pool,
622-
&bank_forks,
623-
&leader_schedule_cache,
624-
&cluster_info,
625-
&retransmit_receiver,
626-
&retransmit_sockets,
627-
&quic_endpoint_sender,
628-
xdp_sender.as_ref(),
629-
&mut stats,
630-
&cluster_nodes_cache,
631-
&mut addr_cache,
632-
&mut shred_deduper,
633-
&max_slots,
634-
rpc_subscriptions.as_deref(),
635-
slot_status_notifier.as_ref(),
636-
&mut shred_buf,
637-
)
638-
.is_ok()
639-
{}
640-
}
621+
.spawn(move || {
622+
let mut shred_buf = Vec::with_capacity(RETRANSMIT_BATCH_SIZE);
623+
while retransmit(
624+
&thread_pool,
625+
&bank_forks,
626+
&leader_schedule_cache,
627+
&cluster_info,
628+
&retransmit_receiver,
629+
&retransmit_sockets,
630+
&quic_endpoint_sender,
631+
xdp_sender.as_ref(),
632+
&mut stats,
633+
&cluster_nodes_cache,
634+
&mut addr_cache,
635+
&mut shred_deduper,
636+
&max_slots,
637+
rpc_subscriptions.as_deref(),
638+
slot_status_notifier.as_ref(),
639+
&mut shred_buf,
640+
&votor_event_sender,
641+
)
642+
.is_ok()
643+
{}
641644
})
642645
.unwrap();
643646

@@ -717,6 +720,7 @@ impl RetransmitStats {
717720
addr_cache: &mut AddrCache,
718721
rpc_subscriptions: Option<&RpcSubscriptions>,
719722
slot_status_notifier: Option<&SlotStatusNotifier>,
723+
votor_event_sender: &Sender<VotorEvent>,
720724
) {
721725
for (slot, mut slot_stats) in feed {
722726
addr_cache.record(slot, &mut slot_stats);
@@ -728,6 +732,7 @@ impl RetransmitStats {
728732
slot_stats.outset,
729733
rpc_subscriptions,
730734
slot_status_notifier,
735+
votor_event_sender,
731736
);
732737
}
733738
self.slot_stats.put(slot, slot_stats);
@@ -821,6 +826,7 @@ fn notify_subscribers(
821826
timestamp: u64, // When the first shred in the slot was received.
822827
rpc_subscriptions: Option<&RpcSubscriptions>,
823828
slot_status_notifier: Option<&SlotStatusNotifier>,
829+
votor_event_sender: &Sender<VotorEvent>,
824830
) {
825831
if let Some(rpc_subscriptions) = rpc_subscriptions {
826832
let slot_update = SlotUpdate::FirstShredReceived { slot, timestamp };
@@ -833,6 +839,9 @@ fn notify_subscribers(
833839
.unwrap()
834840
.notify_first_shred_received(slot);
835841
}
842+
votor_event_sender
843+
.send(VotorEvent::FirstShred(slot))
844+
.unwrap();
836845
}
837846

838847
#[cfg(test)]

votor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dashmap = { workspace = true, features = ["rayon", "raw-api"] }
3333
etcd-client = { workspace = true, features = ["tls"] }
3434
itertools = { workspace = true }
3535
log = { workspace = true }
36+
parking_lot = { workspace = true }
3637
qualifier_attr = { workspace = true }
3738
rayon = { workspace = true }
3839
serde = { workspace = true }

votor/src/event.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,17 @@ pub enum VotorEvent {
3535
/// The block has received a notarization certificate
3636
BlockNotarized(Block),
3737

38+
/// Received the first shred for the slot.
39+
FirstShred(Slot),
40+
3841
/// The pool has marked the given block as a ready parent for `slot`
3942
ParentReady { slot: Slot, parent_block: Block },
4043

41-
/// The skip timer has fired for the given slot
44+
//// Timeout to early detect that a honest that has crashed and
45+
/// if the leader window should be skipped.
46+
TimeoutCrashedLeader(Slot),
47+
48+
/// Timeout to inspect whether the remaining leader window should be skipped.
4249
Timeout(Slot),
4350

4451
/// The given block has reached the safe to notar status
@@ -67,16 +74,18 @@ impl VotorEvent {
6774
pub(crate) fn should_ignore(&self, root: Slot) -> bool {
6875
match self {
6976
VotorEvent::Block(completed_block) => completed_block.slot <= root,
70-
VotorEvent::BlockNotarized((s, _)) => *s <= root,
71-
VotorEvent::ParentReady {
72-
slot,
77+
VotorEvent::Timeout(s)
78+
| VotorEvent::SafeToSkip(s)
79+
| VotorEvent::TimeoutCrashedLeader(s)
80+
| VotorEvent::FirstShred(s)
81+
| VotorEvent::SafeToNotar((s, _))
82+
| VotorEvent::Finalized((s, _))
83+
| VotorEvent::BlockNotarized((s, _))
84+
| VotorEvent::ParentReady {
85+
slot: s,
7386
parent_block: _,
74-
} => *slot <= root,
75-
VotorEvent::Timeout(s) => *s <= root,
76-
VotorEvent::SafeToNotar((s, _)) => *s <= root,
77-
VotorEvent::SafeToSkip(s) => *s <= root,
87+
} => s <= &root,
7888
VotorEvent::ProduceWindow(_) => false,
79-
VotorEvent::Finalized((s, _)) => *s <= root,
8089
VotorEvent::Standstill(_) => false,
8190
VotorEvent::SetIdentity => false,
8291
}

votor/src/event_handler.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ use {
66
commitment::{alpenglow_update_commitment_cache, AlpenglowCommitmentType},
77
event::{CompletedBlock, VotorEvent, VotorEventReceiver},
88
root_utils::{self, RootContext},
9-
skip_timer::SkipTimerManager,
9+
timer_manager::TimerManager,
1010
vote_history::{VoteHistory, VoteHistoryError},
1111
voting_utils::{self, BLSOp, VoteError, VotingContext},
1212
votor::{SharedContext, Votor},
1313
},
1414
crossbeam_channel::{select, RecvError, SendError},
15+
parking_lot::RwLock,
1516
solana_clock::Slot,
1617
solana_hash::Hash,
1718
solana_ledger::leader_schedule_utils::{
@@ -25,7 +26,7 @@ use {
2526
collections::{BTreeMap, BTreeSet},
2627
sync::{
2728
atomic::{AtomicBool, Ordering},
28-
Arc, Condvar, Mutex, RwLock,
29+
Arc, Condvar, Mutex,
2930
},
3031
thread::{self, Builder, JoinHandle},
3132
time::Duration,
@@ -43,7 +44,7 @@ pub(crate) struct EventHandlerContext {
4344
pub(crate) start: Arc<(Mutex<bool>, Condvar)>,
4445

4546
pub(crate) event_receiver: VotorEventReceiver,
46-
pub(crate) skip_timer: Arc<RwLock<SkipTimerManager>>,
47+
pub(crate) timer_manager: Arc<RwLock<TimerManager>>,
4748

4849
// Contexts
4950
pub(crate) shared_context: SharedContext,
@@ -94,14 +95,15 @@ impl EventHandler {
9495
exit,
9596
start,
9697
event_receiver,
97-
skip_timer,
98+
timer_manager,
9899
shared_context: ctx,
99100
voting_context: mut vctx,
100101
root_context: rctx,
101102
} = context;
102103
let mut my_pubkey = vctx.identity_keypair.pubkey();
103104
let mut pending_blocks = PendingBlocks::default();
104105
let mut finalized_blocks = BTreeSet::default();
106+
let mut received_shred = BTreeSet::default();
105107

106108
// Wait until migration has completed
107109
info!("{my_pubkey}: Event loop initialized");
@@ -139,12 +141,13 @@ impl EventHandler {
139141
let votes = Self::handle_event(
140142
&mut my_pubkey,
141143
event,
142-
&skip_timer,
144+
&timer_manager,
143145
&ctx,
144146
&mut vctx,
145147
&rctx,
146148
&mut pending_blocks,
147149
&mut finalized_blocks,
150+
&mut received_shred,
148151
)?;
149152

150153
// TODO: properly bubble up error handling here and in call graph
@@ -159,12 +162,13 @@ impl EventHandler {
159162
fn handle_event(
160163
my_pubkey: &mut Pubkey,
161164
event: VotorEvent,
162-
skip_timer: &RwLock<SkipTimerManager>,
165+
timer_manager: &RwLock<TimerManager>,
163166
ctx: &SharedContext,
164167
vctx: &mut VotingContext,
165168
rctx: &RootContext,
166169
pending_blocks: &mut PendingBlocks,
167170
finalized_blocks: &mut BTreeSet<Block>,
171+
received_shred: &mut BTreeSet<Slot>,
168172
) -> Result<Vec<Result<BLSOp, VoteError>>, EventLoopError> {
169173
let mut votes = vec![];
170174
match event {
@@ -195,6 +199,7 @@ impl EventHandler {
195199
rctx,
196200
pending_blocks,
197201
finalized_blocks,
202+
received_shred,
198203
)?;
199204
}
200205

@@ -205,16 +210,29 @@ impl EventHandler {
205210
Self::try_final(my_pubkey, block, vctx, &mut votes);
206211
}
207212

213+
VotorEvent::FirstShred(slot) => {
214+
info!("{my_pubkey}: First shred {slot}");
215+
received_shred.insert(slot);
216+
}
217+
208218
// Received a parent ready notification for `slot`
209219
VotorEvent::ParentReady { slot, parent_block } => {
210220
info!("{my_pubkey}: Parent ready {slot} {parent_block:?}");
211221
let should_set_timeouts = vctx.vote_history.add_parent_ready(slot, parent_block);
212222
Self::check_pending_blocks(my_pubkey, pending_blocks, vctx, &mut votes);
213223
if should_set_timeouts {
214-
skip_timer.write().unwrap().set_timeouts(slot);
224+
timer_manager.write().set_timeouts(slot);
215225
}
216226
}
217227

228+
VotorEvent::TimeoutCrashedLeader(slot) => {
229+
info!("{my_pubkey}: TimeoutCrashedLeader {slot}");
230+
if vctx.vote_history.voted(slot) || received_shred.contains(&slot) {
231+
return Ok(votes);
232+
}
233+
Self::try_skip_window(my_pubkey, slot, vctx, &mut votes);
234+
}
235+
218236
// Skip timer for the slot has fired
219237
VotorEvent::Timeout(slot) => {
220238
info!("{my_pubkey}: Timeout {slot}");
@@ -289,6 +307,7 @@ impl EventHandler {
289307
rctx,
290308
pending_blocks,
291309
finalized_blocks,
310+
received_shred,
292311
)?;
293312
}
294313

@@ -538,6 +557,7 @@ impl EventHandler {
538557
rctx: &RootContext,
539558
pending_blocks: &mut PendingBlocks,
540559
finalized_blocks: &mut BTreeSet<Block>,
560+
received_shred: &mut BTreeSet<Slot>,
541561
) -> Result<(), SetRootError> {
542562
let bank_forks_r = ctx.bank_forks.read().unwrap();
543563
let old_root = bank_forks_r.root();
@@ -565,6 +585,7 @@ impl EventHandler {
565585
rctx,
566586
pending_blocks,
567587
finalized_blocks,
588+
received_shred,
568589
)
569590
}
570591

votor/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub mod commitment;
1010
pub mod event;
1111
mod event_handler;
1212
pub mod root_utils;
13-
mod skip_timer;
13+
mod timer_manager;
1414
pub mod vote_history;
1515
pub mod vote_history_storage;
1616
pub mod voting_utils;

votor/src/root_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@ pub(crate) fn set_root(
4141
rctx: &RootContext,
4242
pending_blocks: &mut PendingBlocks,
4343
finalized_blocks: &mut BTreeSet<Block>,
44+
received_shred: &mut BTreeSet<Slot>,
4445
) -> Result<(), SetRootError> {
4546
info!("{my_pubkey}: setting root {new_root}");
4647
vctx.vote_history.set_root(new_root);
4748
*pending_blocks = pending_blocks.split_off(&new_root);
4849
*finalized_blocks = finalized_blocks.split_off(&(new_root, Hash::default()));
50+
*received_shred = received_shred.split_off(&new_root);
4951

5052
check_and_handle_new_root(
5153
new_root,

0 commit comments

Comments
 (0)