Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 0a38108

Browse files
authored
Add RestartLastVotedForkSlots for wen_restart. (#33239)
* Add RestartLastVotedForkSlots and RestartHeaviestFork for wen_restart. * Fix linter errors. * Revert RestartHeaviestFork, it will be added in another PR. * Update frozen abi message. * Fix wrong number in test generation, change to pub(crate) to limit scope. * Separate push_epoch_slots and push_restart_last_voted_fork_slots. * Add RestartLastVotedForkSlots data structure. * Remove unused parts to make PR smaller. * Remove unused clone. * Use CompressedSlotsVec to share code between EpochSlots and RestartLastVotedForkSlots. * Add total_messages to show how many messages are there. * Reduce RestartLastVotedForkSlots to one packet (16k slots). * Replace last_vote_slot with shred_version, revert CompressedSlotsVec.
1 parent 55f3f20 commit 0a38108

File tree

5 files changed

+180
-8
lines changed

5 files changed

+180
-8
lines changed

gossip/src/cluster_info.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ pub fn make_accounts_hashes_message(
267267
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
268268

269269
// TODO These messages should go through the gpu pipeline for spam filtering
270-
#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")]
270+
#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")]
271271
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
272272
#[allow(clippy::large_enum_variant)]
273273
pub(crate) enum Protocol {
@@ -393,7 +393,8 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
393393
CrdsData::AccountsHashes(_) => true,
394394
CrdsData::LowestSlot(_, _)
395395
| CrdsData::LegacyVersion(_)
396-
| CrdsData::DuplicateShred(_, _) => {
396+
| CrdsData::DuplicateShred(_, _)
397+
| CrdsData::RestartLastVotedForkSlots(_) => {
397398
let stake = stakes.get(&value.pubkey()).copied();
398399
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
399400
}
@@ -4020,7 +4021,7 @@ mod tests {
40204021
ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, values.clone())
40214022
.collect();
40224023
let self_pubkey = solana_sdk::pubkey::new_rand();
4023-
assert!(splits.len() * 3 < NUM_CRDS_VALUES);
4024+
assert!(splits.len() * 2 < NUM_CRDS_VALUES);
40244025
// Assert that all messages are included in the splits.
40254026
assert_eq!(NUM_CRDS_VALUES, splits.iter().map(Vec::len).sum::<usize>());
40264027
splits

gossip/src/cluster_info_metrics.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,16 @@ pub(crate) fn submit_gossip_stats(
627627
("SnapshotHashes-pull", crds_stats.pull.counts[10], i64),
628628
("ContactInfo-push", crds_stats.push.counts[11], i64),
629629
("ContactInfo-pull", crds_stats.pull.counts[11], i64),
630+
(
631+
"RestartLastVotedForkSlots-push",
632+
crds_stats.push.counts[12],
633+
i64
634+
),
635+
(
636+
"RestartLastVotedForkSlots-pull",
637+
crds_stats.pull.counts[12],
638+
i64
639+
),
630640
(
631641
"all-push",
632642
crds_stats.push.counts.iter().sum::<usize>(),
@@ -664,6 +674,16 @@ pub(crate) fn submit_gossip_stats(
664674
("SnapshotHashes-pull", crds_stats.pull.fails[10], i64),
665675
("ContactInfo-push", crds_stats.push.fails[11], i64),
666676
("ContactInfo-pull", crds_stats.pull.fails[11], i64),
677+
(
678+
"RestartLastVotedForkSlots-push",
679+
crds_stats.push.fails[12],
680+
i64
681+
),
682+
(
683+
"RestartLastVotedForkSlots-pull",
684+
crds_stats.pull.fails[12],
685+
i64
686+
),
667687
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
668688
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
669689
);

gossip/src/crds.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pub enum GossipRoute<'a> {
103103
PushMessage(/*from:*/ &'a Pubkey),
104104
}
105105

106-
type CrdsCountsArray = [usize; 12];
106+
type CrdsCountsArray = [usize; 13];
107107

108108
pub(crate) struct CrdsDataStats {
109109
pub(crate) counts: CrdsCountsArray,
@@ -721,6 +721,7 @@ impl CrdsDataStats {
721721
CrdsData::DuplicateShred(_, _) => 9,
722722
CrdsData::SnapshotHashes(_) => 10,
723723
CrdsData::ContactInfo(_) => 11,
724+
CrdsData::RestartLastVotedForkSlots(_) => 12,
724725
// Update CrdsCountsArray if new items are added here.
725726
}
726727
}

gossip/src/crds_value.rs

Lines changed: 153 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use {
22
crate::{
3-
cluster_info::MAX_ACCOUNTS_HASHES,
3+
cluster_info::{MAX_ACCOUNTS_HASHES, MAX_CRDS_OBJECT_SIZE},
44
contact_info::ContactInfo,
55
deprecated,
66
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
7-
epoch_slots::EpochSlots,
7+
epoch_slots::{CompressedSlots, EpochSlots, MAX_SLOTS_PER_ENTRY},
88
legacy_contact_info::LegacyContactInfo,
99
},
1010
bincode::{serialize, serialized_size},
@@ -94,6 +94,7 @@ pub enum CrdsData {
9494
DuplicateShred(DuplicateShredIndex, DuplicateShred),
9595
SnapshotHashes(SnapshotHashes),
9696
ContactInfo(ContactInfo),
97+
RestartLastVotedForkSlots(RestartLastVotedForkSlots),
9798
}
9899

99100
impl Sanitize for CrdsData {
@@ -132,6 +133,7 @@ impl Sanitize for CrdsData {
132133
}
133134
CrdsData::SnapshotHashes(val) => val.sanitize(),
134135
CrdsData::ContactInfo(node) => node.sanitize(),
136+
CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(),
135137
}
136138
}
137139
}
@@ -145,7 +147,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
145147
impl CrdsData {
146148
/// New random CrdsData for tests and benchmarks.
147149
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
148-
let kind = rng.gen_range(0..7);
150+
let kind = rng.gen_range(0..8);
149151
// TODO: Implement other kinds of CrdsData here.
150152
// TODO: Assign ranges to each arm proportional to their frequency in
151153
// the mainnet crds table.
@@ -157,6 +159,9 @@ impl CrdsData {
157159
3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)),
158160
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
159161
5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)),
162+
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
163+
rng, pubkey,
164+
)),
160165
_ => CrdsData::EpochSlots(
161166
rng.gen_range(0..MAX_EPOCH_SLOTS),
162167
EpochSlots::new_rand(rng, pubkey),
@@ -485,6 +490,87 @@ impl Sanitize for NodeInstance {
485490
}
486491
}
487492

493+
#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)]
494+
pub struct RestartLastVotedForkSlots {
495+
pub from: Pubkey,
496+
pub wallclock: u64,
497+
pub slots: Vec<CompressedSlots>,
498+
pub last_voted_hash: Hash,
499+
pub shred_version: u16,
500+
}
501+
502+
impl Sanitize for RestartLastVotedForkSlots {
503+
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
504+
if self.slots.is_empty() {
505+
return Err(SanitizeError::InvalidValue);
506+
}
507+
self.slots.sanitize()?;
508+
self.last_voted_hash.sanitize()
509+
}
510+
}
511+
512+
impl RestartLastVotedForkSlots {
513+
pub fn new(from: Pubkey, now: u64, last_voted_hash: Hash, shred_version: u16) -> Self {
514+
Self {
515+
from,
516+
wallclock: now,
517+
slots: Vec::new(),
518+
last_voted_hash,
519+
shred_version,
520+
}
521+
}
522+
523+
/// New random Version for tests and benchmarks.
524+
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
525+
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
526+
let mut result =
527+
RestartLastVotedForkSlots::new(pubkey, new_rand_timestamp(rng), Hash::new_unique(), 1);
528+
let num_slots = rng.gen_range(2..20);
529+
let mut slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
530+
.take(num_slots)
531+
.collect::<Vec<Slot>>();
532+
slots.sort();
533+
result.fill(&slots);
534+
result
535+
}
536+
537+
pub fn fill(&mut self, slots: &[Slot]) -> usize {
538+
let slots = &slots[slots.len().saturating_sub(MAX_SLOTS_PER_ENTRY)..];
539+
let mut num = 0;
540+
let space = self.max_compressed_slot_size();
541+
if space == 0 {
542+
return 0;
543+
}
544+
while num < slots.len() {
545+
let mut cslot = CompressedSlots::new(space as usize);
546+
num += cslot.add(&slots[num..]);
547+
self.slots.push(cslot);
548+
}
549+
num
550+
}
551+
552+
pub fn deflate(&mut self) {
553+
for s in self.slots.iter_mut() {
554+
let _ = s.deflate();
555+
}
556+
}
557+
558+
pub fn max_compressed_slot_size(&self) -> isize {
559+
let len_header = serialized_size(self).unwrap();
560+
let len_slot = serialized_size(&CompressedSlots::default()).unwrap();
561+
MAX_CRDS_OBJECT_SIZE as isize - (len_header + len_slot) as isize
562+
}
563+
564+
pub fn to_slots(&self, min_slot: Slot) -> Vec<Slot> {
565+
self.slots
566+
.iter()
567+
.filter(|s| min_slot < s.first_slot() + s.num_slots() as u64)
568+
.filter_map(|s| s.to_slots(min_slot).ok())
569+
.flatten()
570+
.collect()
571+
}
572+
}
573+
488574
/// Type of the replicated value
489575
/// These are labels for values in a record that is associated with `Pubkey`
490576
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
@@ -501,6 +587,7 @@ pub enum CrdsValueLabel {
501587
DuplicateShred(DuplicateShredIndex, Pubkey),
502588
SnapshotHashes(Pubkey),
503589
ContactInfo(Pubkey),
590+
RestartLastVotedForkSlots(Pubkey),
504591
}
505592

506593
impl fmt::Display for CrdsValueLabel {
@@ -524,6 +611,9 @@ impl fmt::Display for CrdsValueLabel {
524611
write!(f, "SnapshotHashes({})", self.pubkey())
525612
}
526613
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
614+
CrdsValueLabel::RestartLastVotedForkSlots(_) => {
615+
write!(f, "RestartLastVotedForkSlots({})", self.pubkey())
616+
}
527617
}
528618
}
529619
}
@@ -543,6 +633,7 @@ impl CrdsValueLabel {
543633
CrdsValueLabel::DuplicateShred(_, p) => *p,
544634
CrdsValueLabel::SnapshotHashes(p) => *p,
545635
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
636+
CrdsValueLabel::RestartLastVotedForkSlots(p) => *p,
546637
}
547638
}
548639
}
@@ -593,6 +684,7 @@ impl CrdsValue {
593684
CrdsData::DuplicateShred(_, shred) => shred.wallclock,
594685
CrdsData::SnapshotHashes(hash) => hash.wallclock,
595686
CrdsData::ContactInfo(node) => node.wallclock(),
687+
CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock,
596688
}
597689
}
598690
pub fn pubkey(&self) -> Pubkey {
@@ -609,6 +701,7 @@ impl CrdsValue {
609701
CrdsData::DuplicateShred(_, shred) => shred.from,
610702
CrdsData::SnapshotHashes(hash) => hash.from,
611703
CrdsData::ContactInfo(node) => *node.pubkey(),
704+
CrdsData::RestartLastVotedForkSlots(slots) => slots.from,
612705
}
613706
}
614707
pub fn label(&self) -> CrdsValueLabel {
@@ -627,6 +720,9 @@ impl CrdsValue {
627720
CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from),
628721
CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()),
629722
CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()),
723+
CrdsData::RestartLastVotedForkSlots(_) => {
724+
CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey())
725+
}
630726
}
631727
}
632728
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {
@@ -1073,4 +1169,58 @@ mod test {
10731169
assert!(node.should_force_push(&pubkey));
10741170
assert!(!node.should_force_push(&Pubkey::new_unique()));
10751171
}
1172+
1173+
#[test]
1174+
fn test_restart_last_voted_fork_slots() {
1175+
let keypair = Keypair::new();
1176+
let slot = 53;
1177+
let slot_parent = slot - 5;
1178+
let shred_version = 21;
1179+
let mut slots = RestartLastVotedForkSlots::new(
1180+
keypair.pubkey(),
1181+
timestamp(),
1182+
Hash::default(),
1183+
shred_version,
1184+
);
1185+
let original_slots_vec = [slot_parent, slot];
1186+
slots.fill(&original_slots_vec);
1187+
let value =
1188+
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair);
1189+
assert_eq!(value.sanitize(), Ok(()));
1190+
let label = value.label();
1191+
assert_eq!(
1192+
label,
1193+
CrdsValueLabel::RestartLastVotedForkSlots(keypair.pubkey())
1194+
);
1195+
assert_eq!(label.pubkey(), keypair.pubkey());
1196+
assert_eq!(value.wallclock(), slots.wallclock);
1197+
let retrived_slots = slots.to_slots(0);
1198+
assert_eq!(retrived_slots.len(), 2);
1199+
assert_eq!(retrived_slots[0], slot_parent);
1200+
assert_eq!(retrived_slots[1], slot);
1201+
1202+
let empty_slots = RestartLastVotedForkSlots::new(
1203+
keypair.pubkey(),
1204+
timestamp(),
1205+
Hash::default(),
1206+
shred_version,
1207+
);
1208+
let bad_value =
1209+
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(empty_slots), &keypair);
1210+
assert_eq!(bad_value.sanitize(), Err(SanitizeError::InvalidValue));
1211+
1212+
let last_slot: Slot = (MAX_SLOTS_PER_ENTRY + 10).try_into().unwrap();
1213+
let mut large_slots = RestartLastVotedForkSlots::new(
1214+
keypair.pubkey(),
1215+
timestamp(),
1216+
Hash::default(),
1217+
shred_version,
1218+
);
1219+
let large_slots_vec: Vec<Slot> = (0..last_slot + 1).collect();
1220+
large_slots.fill(&large_slots_vec);
1221+
let retrived_slots = large_slots.to_slots(0);
1222+
assert_eq!(retrived_slots.len(), MAX_SLOTS_PER_ENTRY);
1223+
assert_eq!(retrived_slots.first(), Some(&11));
1224+
assert_eq!(retrived_slots.last(), Some(&last_slot));
1225+
}
10761226
}

gossip/src/epoch_slots.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ impl Default for CompressedSlots {
178178
}
179179

180180
impl CompressedSlots {
181-
fn new(max_size: usize) -> Self {
181+
pub(crate) fn new(max_size: usize) -> Self {
182182
CompressedSlots::Uncompressed(Uncompressed::new(max_size))
183183
}
184184

0 commit comments

Comments
 (0)