diff --git a/Cargo.lock b/Cargo.lock index d885498327..1ceefc9afd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11972,6 +11972,7 @@ dependencies = [ "solana-vote-interface", "solana-votor-messages", "spl-pod", + "wincode", ] [[package]] diff --git a/core/src/block_creation_loop.rs b/core/src/block_creation_loop.rs index ddecf81715..aed2ce127d 100644 --- a/core/src/block_creation_loop.rs +++ b/core/src/block_creation_loop.rs @@ -9,7 +9,7 @@ use { banking_trace::BankingTracer, replay_stage::{Finalizer, ReplayStage}, }, - crossbeam_channel::Receiver, + crossbeam_channel::{Receiver, Sender}, solana_clock::Slot, solana_entry::block_component::{ BlockFooterV1, BlockMarkerV1, GenesisCertificate, VersionedBlockMarker, @@ -34,7 +34,12 @@ use { block_component_processor::BlockComponentProcessor, }, solana_version::version, - solana_votor::{common::block_timeout, event::LeaderWindowInfo}, + solana_votor::{ + common::block_timeout, + consensus_rewards::{BuildRewardCertsRequest, BuildRewardCertsResponse}, + event::LeaderWindowInfo, + }, + solana_votor_messages::rewards_certificate::{NotarRewardCertificate, SkipRewardCertificate}, stats::{BlockCreationLoopMetrics, SlotMetrics}, std::{ sync::{ @@ -93,6 +98,9 @@ pub struct BlockCreationLoopConfig { // Channel to receive RecordReceiver from PohService pub record_receiver_receiver: Receiver, pub optimistic_parent_receiver: Receiver, + + pub build_reward_certs_sender: Sender, + pub reward_certs_receiver: Receiver, } struct LeaderContext { @@ -110,6 +118,8 @@ struct LeaderContext { slot_status_notifier: Option, banking_tracer: Arc, replay_highest_frozen: Arc, + build_reward_certs_sender: Sender, + reward_certs_receiver: Receiver, // Metrics metrics: BlockCreationLoopMetrics, @@ -164,6 +174,8 @@ fn start_loop(config: BlockCreationLoopConfig) { replay_highest_frozen, highest_parent_ready, optimistic_parent_receiver, + build_reward_certs_sender, + reward_certs_receiver, } = config; // Similar to Votor, if this loop dies kill the validator @@ -210,6 +222,8 @@ fn start_loop(config: BlockCreationLoopConfig) { replay_highest_frozen, metrics: BlockCreationLoopMetrics::default(), slot_metrics: SlotMetrics::default(), + build_reward_certs_sender, + reward_certs_receiver, genesis_cert, }; @@ -331,6 +345,8 @@ fn produce_window( if let Err(e) = record_and_complete_block( ctx.poh_recorder.as_ref(), &mut ctx.record_receiver, + &ctx.build_reward_certs_sender, + &ctx.reward_certs_receiver, skip_timer, timeout, ) { @@ -384,7 +400,11 @@ fn skew_block_producer_time_nanos( /// Produces a block footer with the current timestamp and version information. /// The bank_hash field is left as default and will be filled in after the bank freezes. -fn produce_block_footer(bank: Arc) -> BlockFooterV1 { +fn produce_block_footer( + bank: Arc, + skip_reward_certificate: Option, + notar_reward_certificate: Option, +) -> BlockFooterV1 { let mut block_producer_time_nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Misconfigured system clock; couldn't measure block producer time.") @@ -411,6 +431,8 @@ fn produce_block_footer(bank: Arc) -> BlockFooterV1 { bank_hash: Hash::default(), block_producer_time_nanos: block_producer_time_nanos as u64, block_user_agent: format!("agave/{}", version!()).into_bytes(), + skip_reward_certificate, + notar_reward_certificate, } } @@ -424,9 +446,17 @@ fn produce_block_footer(bank: Arc) -> BlockFooterV1 { fn record_and_complete_block( poh_recorder: &RwLock, record_receiver: &mut RecordReceiver, + build_reward_certs_sender: &Sender, + cert_receiver: &Receiver, block_timer: Instant, block_timeout: Duration, ) -> Result<(), PohRecorderError> { + // XXX: how to look up the slot. + let slot = u64::MAX; + // XXX: handle error below. + build_reward_certs_sender + .send(BuildRewardCertsRequest { slot }) + .unwrap(); loop { let remaining_slot_time = block_timeout.saturating_sub(block_timer.elapsed()); if remaining_slot_time.is_zero() { @@ -477,7 +507,13 @@ fn record_and_complete_block( // Produce the footer with the current timestamp let working_bank = w_poh_recorder.working_bank().unwrap(); - let footer = produce_block_footer(working_bank.bank.clone_without_scheduler()); + /// XXX: handle error below + let resp = cert_receiver.recv().unwrap(); + let footer = produce_block_footer( + working_bank.bank.clone_without_scheduler(), + resp.skip, + resp.notar, + ); BlockComponentProcessor::update_bank_with_footer( working_bank.bank.clone_without_scheduler(), diff --git a/core/src/bls_sigverify/bls_sigverifier.rs b/core/src/bls_sigverify/bls_sigverifier.rs index a16dcd93c5..78398a7b11 100644 --- a/core/src/bls_sigverify/bls_sigverifier.rs +++ b/core/src/bls_sigverify/bls_sigverifier.rs @@ -19,12 +19,17 @@ use { signature::SignatureProjective, }, solana_clock::Slot, + solana_gossip::cluster_info::ClusterInfo, + solana_ledger::leader_schedule_cache::LeaderScheduleCache, solana_measure::measure::Measure, solana_pubkey::Pubkey, solana_rpc::alpenglow_last_voted::AlpenglowLastVoted, solana_runtime::{bank::Bank, bank_forks::SharableBanks, epoch_stakes::BLSPubkeyToRankMap}, solana_streamer::packet::PacketBatch, - solana_votor::consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender}, + solana_votor::{ + consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender}, + consensus_rewards::{self, AddVoteEntry, AddVoteMessage}, + }, solana_votor_messages::{ consensus_message::{Certificate, CertificateType, ConsensusMessage, VoteMessage}, vote::Vote, @@ -60,6 +65,7 @@ enum CertVerifyError { pub struct BLSSigVerifier { verified_votes_sender: VerifiedVoteSender, + reward_votes_sender: Sender, message_sender: Sender, sharable_banks: SharableBanks, stats: BLSSigVerifierStats, @@ -68,6 +74,8 @@ pub struct BLSSigVerifier { consensus_metrics_sender: ConsensusMetricsEventSender, last_checked_root_slot: Slot, alpenglow_last_voted: Arc, + cluster_info: Arc, + leader_schedule: Arc, } impl BLSSigVerifier { @@ -148,8 +156,16 @@ impl BLSSigVerifier { id: *solana_pubkey, vote: vote_message.vote, }); - // Only need votes newer than root slot - if vote_message.vote.slot() <= root_bank.slot() { + + // consensus pool does not need votes for slots older than root slot however the rewards container may still need them. + if vote_message.vote.slot() <= root_bank.slot() + && !consensus_rewards::wants_vote( + &self.cluster_info, + &self.leader_schedule, + root_bank.slot(), + &vote_message, + ) + { self.stats.received_old.fetch_add(1, Ordering::Relaxed); packet.meta_mut().set_discard(true); continue; @@ -191,12 +207,12 @@ impl BLSSigVerifier { .fetch_add(preprocess_time.as_us(), Ordering::Relaxed); let (votes_result, certs_result) = rayon::join( - || self.verify_and_send_votes(votes_to_verify), + || self.verify_and_send_votes(votes_to_verify, &root_bank), || self.verify_and_send_certificates(certs_to_verify, &root_bank), ); - votes_result?; - certs_result?; + let add_vote_msg = votes_result?; + let () = certs_result?; // Send to RPC service for last voted tracking self.alpenglow_last_voted @@ -211,6 +227,10 @@ impl BLSSigVerifier { warn!("could not send consensus metrics, receive side of channel is closed"); } + if self.reward_votes_sender.send(add_vote_msg).is_err() { + warn!("could not send votes to reward container, receive side of channel is closed"); + } + self.stats.maybe_report_stats(); Ok(()) @@ -221,13 +241,17 @@ impl BLSSigVerifier { pub fn new( sharable_banks: SharableBanks, verified_votes_sender: VerifiedVoteSender, + reward_votes_sender: Sender, message_sender: Sender, consensus_metrics_sender: ConsensusMetricsEventSender, alpenglow_last_voted: Arc, + cluster_info: Arc, + leader_schedule: Arc, ) -> Self { Self { sharable_banks, verified_votes_sender, + reward_votes_sender, message_sender, stats: BLSSigVerifierStats::new(), verified_certs: RwLock::new(HashSet::new()), @@ -235,14 +259,42 @@ impl BLSSigVerifier { consensus_metrics_sender, last_checked_root_slot: 0, alpenglow_last_voted, + cluster_info, + leader_schedule, } } + /// Verifies votes and sends verified votes to the consensus pool. + /// Also returns a copy of the verified votes that the rewards container is interested is so that the caller can send them to it. fn verify_and_send_votes( &self, votes_to_verify: Vec, - ) -> Result<(), BLSSigVerifyServiceError> { + root_bank: &Bank, + ) -> Result> { let verified_votes = self.verify_votes(votes_to_verify); + + let reward_votes = verified_votes + .iter() + .filter_map(|v| { + let vote = v.vote_message; + let rank_map = get_key_to_rank_map(root_bank, vote.vote.slot())?; + consensus_rewards::wants_vote( + &self.cluster_info, + &self.leader_schedule, + root_bank.slot(), + &vote, + ) + .then_some(AddVoteEntry { + max_validators: rank_map.len(), + vote, + }) + }) + .collect(); + let add_vote_msg = AddVoteMessage { + root_slot: root_bank.slot(), + votes: reward_votes, + }; + self.stats .total_valid_packets .fetch_add(verified_votes.len() as u64, Ordering::Relaxed); @@ -261,7 +313,7 @@ impl BLSSigVerifier { } } - // Send the BLS vote messaage to certificate pool + // Send the votes to the consensus pool match self .message_sender .try_send(ConsensusMessage::Vote(vote.vote_message)) @@ -295,7 +347,7 @@ impl BLSSigVerifier { } } - Ok(()) + Ok(add_vote_msg) } fn verify_votes(&self, votes_to_verify: Vec) -> Vec { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b66f6b27a4..246fe6336e 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -86,6 +86,7 @@ use { solana_vote::vote_transaction::VoteTransaction, solana_votor::{ consensus_metrics::{ConsensusMetricsEventReceiver, ConsensusMetricsEventSender}, + consensus_rewards::{AddVoteMessage, BuildRewardCertsRequest, BuildRewardCertsResponse}, event::{ CompletedBlock, LeaderWindowInfo, VotorEvent, VotorEventReceiver, VotorEventSender, }, @@ -299,6 +300,9 @@ pub struct ReplayStageConfig { pub consensus_metrics_sender: ConsensusMetricsEventSender, pub consensus_metrics_receiver: ConsensusMetricsEventReceiver, pub migration_status: Arc, + pub reward_certs_sender: Sender, + pub reward_votes_receiver: Receiver, + pub build_reward_certs_receiver: Receiver, } pub struct ReplaySenders { @@ -620,6 +624,9 @@ impl ReplayStage { consensus_metrics_sender, consensus_metrics_receiver, migration_status, + reward_certs_sender, + reward_votes_receiver, + build_reward_certs_receiver, } = config; let ReplaySenders { @@ -694,6 +701,9 @@ impl ReplayStage { consensus_metrics_sender, consensus_metrics_receiver, migration_status: migration_status.clone(), + reward_certs_sender, + reward_votes_receiver, + build_reward_certs_receiver, }; let votor = Votor::new(votor_config); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index e1acaff6d1..4c92c8942f 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -61,6 +61,7 @@ use { }, solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpSender}, solana_votor::{ + consensus_rewards::{BuildRewardCertsRequest, BuildRewardCertsResponse}, event::{LeaderWindowInfo, VotorEventReceiver, VotorEventSender}, vote_history::VoteHistory, vote_history_storage::VoteHistoryStorage, @@ -216,6 +217,8 @@ impl Tvu { key_notifiers: Arc>, alpenglow_last_voted: Arc, migration_status: Arc, + reward_certs_sender: Sender, + build_reward_certs_receiver: Receiver, ) -> Result { let (consensus_message_sender, consensus_message_receiver) = bounded(MAX_ALPENGLOW_PACKET_NUM); @@ -268,14 +271,18 @@ impl Tvu { alpenglow_quic_server_config, ) .unwrap(); + let (reward_votes_sender, reward_votes_receiver) = unbounded(); let alpenglow_sigverify_service = { let sharable_banks = bank_forks.read().unwrap().sharable_banks(); let verifier = BLSSigVerifier::new( sharable_banks, verified_vote_sender.clone(), + reward_votes_sender, consensus_message_sender.clone(), consensus_metrics_sender.clone(), alpenglow_last_voted.clone(), + cluster_info.clone(), + leader_schedule_cache.clone(), ); BLSSigverifyService::new(bls_packet_receiver, verifier) }; @@ -449,6 +456,9 @@ impl Tvu { consensus_metrics_sender: consensus_metrics_sender.clone(), consensus_metrics_receiver, migration_status, + reward_certs_sender, + reward_votes_receiver, + build_reward_certs_receiver, }; let voting_service = VotingService::new( diff --git a/core/src/validator.rs b/core/src/validator.rs index a56803bdb8..db617cbe57 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1438,6 +1438,9 @@ impl Validator { let (optimistic_parent_sender, optimistic_parent_receiver) = unbounded(); + let (build_reward_certs_sender, build_reward_certs_receiver) = bounded(1); + let (reward_certs_sender, reward_certs_receiver) = bounded(1); + let block_creation_loop_config = BlockCreationLoopConfig { exit: exit.clone(), bank_forks: bank_forks.clone(), @@ -1453,6 +1456,8 @@ impl Validator { replay_highest_frozen: replay_highest_frozen.clone(), highest_parent_ready: highest_parent_ready.clone(), optimistic_parent_receiver: optimistic_parent_receiver.clone(), + build_reward_certs_sender, + reward_certs_receiver, }; let block_creation_loop = BlockCreationLoop::new(block_creation_loop_config); @@ -1710,6 +1715,8 @@ impl Validator { key_notifiers.clone(), alpenglow_last_voted.clone(), migration_status.clone(), + reward_certs_sender, + build_reward_certs_receiver, ) .map_err(ValidatorError::Other)?; diff --git a/entry/src/block_component.rs b/entry/src/block_component.rs index 87f6026a66..29e98b0e14 100644 --- a/entry/src/block_component.rs +++ b/entry/src/block_component.rs @@ -96,7 +96,10 @@ use { solana_bls_signatures::Signature as BLSSignature, solana_clock::Slot, solana_hash::Hash, - solana_votor_messages::consensus_message::{Certificate, CertificateType}, + solana_votor_messages::{ + consensus_message::{Certificate, CertificateType}, + rewards_certificate::{NotarRewardCertificate, SkipRewardCertificate}, + }, std::mem::MaybeUninit, wincode::{ containers::{Pod, Vec as WincodeVec}, @@ -211,6 +214,8 @@ pub struct BlockFooterV1 { pub block_producer_time_nanos: u64, #[wincode(with = "WincodeVec")] pub block_user_agent: Vec, + pub skip_reward_certificate: Option, + pub notar_reward_certificate: Option, } #[derive(Clone, PartialEq, Eq, Debug, SchemaWrite, SchemaRead)] @@ -299,6 +304,7 @@ pub enum VersionedUpdateParent { } /// TLV-encoded marker variants. +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, PartialEq, Eq, SchemaWrite, SchemaRead)] #[wincode(tag_encoding = "u8")] pub enum BlockMarkerV1 { diff --git a/votor-messages/Cargo.toml b/votor-messages/Cargo.toml index a8e766b5b4..ee1d9f0c28 100644 --- a/votor-messages/Cargo.toml +++ b/votor-messages/Cargo.toml @@ -42,6 +42,7 @@ solana-logger = { workspace = true } solana-program = { workspace = true } solana-vote-interface = { workspace = true } spl-pod = { workspace = true } +wincode = { workspace = true } [dev-dependencies] solana-votor-messages = { path = ".", features = ["dev-context-only-utils"] } diff --git a/votor-messages/src/lib.rs b/votor-messages/src/lib.rs index 6dff38fdb2..46d90efb6a 100644 --- a/votor-messages/src/lib.rs +++ b/votor-messages/src/lib.rs @@ -4,6 +4,7 @@ pub mod consensus_message; pub mod migration; +pub mod rewards_certificate; pub mod vote; #[cfg_attr(feature = "frozen-abi", macro_use)] diff --git a/votor-messages/src/rewards_certificate.rs b/votor-messages/src/rewards_certificate.rs new file mode 100644 index 0000000000..2aa1d353db --- /dev/null +++ b/votor-messages/src/rewards_certificate.rs @@ -0,0 +1,37 @@ +//! Defines aggregates used for rewards. + +use { + solana_bls_signatures::Signature as BLSSignature, + solana_clock::Slot, + solana_hash::Hash, + wincode::{containers::Pod, SchemaRead, SchemaWrite}, +}; + +/// Reward certificate for the validators that voted skip. +/// +/// Unlike the skip certificate which can be base-2 or base-3 encoded, this is guaranteed to be base-2 encoded. +#[derive(Clone, PartialEq, Eq, Debug, SchemaWrite, SchemaRead)] +pub struct SkipRewardCertificate { + /// The slot the certificate is for. + pub slot: Slot, + /// The signature + #[wincode(with = "Pod")] + pub signature: BLSSignature, + /// The bitmap for validators, see solana-signer-store for encoding format + pub bitmap: Vec, +} + +/// Reward certificate for the validators that voted notar. +#[derive(Clone, PartialEq, Eq, Debug, SchemaWrite, SchemaRead)] +pub struct NotarRewardCertificate { + /// The slot the certificate is for. + pub slot: Slot, + /// The block id the certificate is for. + #[wincode(with = "Pod")] + pub block_id: Hash, + /// The signature + #[wincode(with = "Pod")] + pub signature: BLSSignature, + /// The bitmap for validators, see solana-signer-store for encoding format + pub bitmap: Vec, +} diff --git a/votor/src/consensus_rewards.rs b/votor/src/consensus_rewards.rs new file mode 100644 index 0000000000..e10769d8cb --- /dev/null +++ b/votor/src/consensus_rewards.rs @@ -0,0 +1,273 @@ +use { + crossbeam_channel::{select_biased, Receiver, Sender}, + entry::Entry, + solana_clock::Slot, + solana_gossip::cluster_info::ClusterInfo, + solana_ledger::leader_schedule_cache::LeaderScheduleCache, + solana_votor_messages::{ + consensus_message::VoteMessage, + rewards_certificate::{NotarRewardCertificate, SkipRewardCertificate}, + vote::Vote, + }, + std::{ + collections::BTreeMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, +}; + +mod entry; + +/// Number of slots in the past that the the current leader is responsible for producing the reward certificates. +const NUM_SLOTS_FOR_REWARD: u64 = 8; + +/// Returns [`false`] if the rewards container is not interested in the [`VoteMessage`]. +/// Returns [`true`] if the rewards container might be interested in the [`VoteMessage`]. +pub fn wants_vote( + cluster_info: &ClusterInfo, + leader_schedule: &LeaderScheduleCache, + root_slot: Slot, + vote: &VoteMessage, +) -> bool { + match vote.vote { + Vote::Notarize(_) | Vote::Skip(_) => (), + Vote::Finalize(_) + | Vote::NotarizeFallback(_) + | Vote::SkipFallback(_) + | Vote::Genesis(_) => return false, + } + let vote_slot = vote.vote.slot(); + if vote_slot.saturating_add(NUM_SLOTS_FOR_REWARD) < root_slot { + return false; + } + let my_pubkey = cluster_info.id(); + let Some(leader) = + leader_schedule.slot_leader_at(vote_slot.saturating_add(NUM_SLOTS_FOR_REWARD), None) + else { + return false; + }; + if leader != my_pubkey { + return false; + } + true +} + +/// Container to storing state needed to generate reward certificates. +struct ConsensusRewards { + /// Per [`Slot`], stores skip and notar votes. + votes: BTreeMap, + /// Stores the latest pubkey for the current node. + cluster_info: Arc, + /// Stores the leader schedules. + leader_schedule: Arc, + /// Flag to indicate when the channel receiving loop should exit. + exit: Arc, + /// Channel to receive messages to build reward certificates. + build_reward_certs_receiver: Receiver, + /// Channel send the built reward certificates. + reward_certs_sender: Sender, + /// Channel to receive verified votes. + votes_receiver: Receiver, +} + +impl ConsensusRewards { + /// Constructs a new instance of [`ConsensusRewards`]. + fn new( + cluster_info: Arc, + leader_schedule: Arc, + exit: Arc, + build_reward_certs_receiver: Receiver, + reward_certs_sender: Sender, + votes_receiver: Receiver, + ) -> Self { + Self { + votes: BTreeMap::default(), + cluster_info, + leader_schedule, + exit, + build_reward_certs_receiver, + reward_certs_sender, + votes_receiver, + } + } + + /// Runs a loop receiving and handling messages over different channels. + fn run(&mut self) { + while !self.exit.load(Ordering::Relaxed) { + // bias messages to build certificates as that is on the critical path + select_biased! { + recv(self.build_reward_certs_receiver) -> msg => { + match msg { + Ok(msg) => { + let (skip, notar) = self.build_certs(msg.slot); + let resp = BuildRewardCertsResponse { + skip, + notar, + }; + if self.reward_certs_sender.send(resp).is_err() { + warn!("cert sender channel is disconnected; exiting."); + break; + } + } + Err(_) => { + warn!("build reward certs channel is disconnected; exiting."); + break; + } + } + } + recv(self.votes_receiver) -> msg => { + match msg { + Ok(msg) => { + for entry in msg.votes { + self.add_vote(msg.root_slot, entry.max_validators, &entry.vote); + } + } + Err(_) => { + warn!("votes receiver channel is disconnected; exiting."); + break; + } + } + } + default(Duration::from_secs(1)) => { + continue; + } + } + } + } + + /// Returns [`true`] if the rewards container is interested in this vote else [`false`]. + fn wants_vote(&self, root_slot: Slot, vote: &VoteMessage) -> bool { + if !wants_vote(&self.cluster_info, &self.leader_schedule, root_slot, vote) { + return false; + } + let Some(entry) = self.votes.get(&vote.vote.slot()) else { + return true; + }; + entry.wants_vote(vote) + } + + /// Adds received [`VoteMessage`]s from other validators. + fn add_vote(&mut self, root_slot: Slot, max_validators: usize, vote: &VoteMessage) { + // drop old state no longer needed + self.votes = self.votes.split_off( + &(root_slot + .saturating_add(NUM_SLOTS_FOR_REWARD) + .saturating_add(1)), + ); + + if !self.wants_vote(root_slot, vote) { + return; + } + match self + .votes + .entry(vote.vote.slot()) + .or_insert(Entry::new(max_validators)) + .add_vote(vote) + { + Ok(()) => (), + Err(e) => { + warn!("Adding vote {vote:?} failed with {e}"); + } + } + } + + /// Builds reward certificates. + fn build_certs( + &self, + slot: Slot, + ) -> ( + Option, + Option, + ) { + match self.votes.get(&slot) { + None => (None, None), + Some(entry) => { + let (skip, notar) = entry.build_certs(slot); + let skip = match skip { + Ok(s) => s, + Err(e) => { + warn!("Build skip reward cert failed with {e}"); + None + } + }; + let notar = match notar { + Ok(n) => n, + Err(e) => { + warn!("Build notar reward cert failed with {e}"); + None + } + }; + (skip, notar) + } + } + } +} + +/// Message to add votes to the rewards container. +pub struct AddVoteMessage { + /// The current root slot. + pub root_slot: Slot, + /// List of [`AddVoteEntry`], one per vote. + pub votes: Vec, +} + +/// Data structure for sending per vote state in the [`AddVoteMessage`]. +pub struct AddVoteEntry { + /// Maximum number of validators in the slot that this vote is for. + pub max_validators: usize, + /// The actual vote. + pub vote: VoteMessage, +} + +/// Request to build reward certificates. +pub struct BuildRewardCertsRequest { + pub slot: Slot, +} + +/// Response of building reward certificates. +pub struct BuildRewardCertsResponse { + /// Skip reward certificate. None if building failed or no skip votes were registered. + pub skip: Option, + /// Notar reward certificate. None if building failed or no notar votes were registered. + pub notar: Option, +} + +/// Service to run the consensus reward container in a dedicated thread. +pub struct ConsensusRewardsService { + handle: JoinHandle<()>, +} + +impl ConsensusRewardsService { + pub fn new( + cluster_info: Arc, + leader_schedule: Arc, + votes_receiver: Receiver, + build_reward_certs_receiver: Receiver, + reward_certs_sender: Sender, + exit: Arc, + ) -> Self { + let handle = Builder::new() + .name("solConsRew".to_string()) + .spawn(move || { + ConsensusRewards::new( + cluster_info, + leader_schedule, + exit, + build_reward_certs_receiver, + reward_certs_sender, + votes_receiver, + ) + .run(); + }) + .unwrap(); + Self { handle } + } + + pub fn join(self) -> thread::Result<()> { + self.handle.join() + } +} diff --git a/votor/src/consensus_rewards/entry.rs b/votor/src/consensus_rewards/entry.rs new file mode 100644 index 0000000000..9d22eb5a34 --- /dev/null +++ b/votor/src/consensus_rewards/entry.rs @@ -0,0 +1,189 @@ +use { + bitvec::{order::Lsb0, vec::BitVec}, + solana_bls_signatures::{BlsError, Signature as BLSSignature, SignatureProjective}, + solana_clock::Slot, + solana_hash::Hash, + solana_signer_store::{encode_base2, EncodeError}, + solana_votor_messages::{ + consensus_message::VoteMessage, + rewards_certificate::{NotarRewardCertificate, SkipRewardCertificate}, + vote::Vote, + }, + std::collections::HashMap, + thiserror::Error, +}; + +/// Different types of errors that can be returned from adding votes. +#[derive(Debug, Error)] +pub(super) enum AddVoteError { + #[error("rank on vote is invalid")] + InvalidRank, + #[error("duplicate vote")] + Duplicate, + #[error("BLS error: {0}")] + Bls(#[from] BlsError), +} + +/// Different types of errors that can be returned from building reward certs. +#[derive(Debug, Error)] +pub(super) enum BuildCertError { + #[error("Encoding failed: {0:?}")] + Encode(EncodeError), +} + +/// Struct to hold state for building a single reward cert. +struct PartialCert { + /// In progress signature aggregate. + signature: SignatureProjective, + /// bitvec of ranks whose signatures is included in the aggregate above. + bitvec: BitVec, + /// the largest rank in the aggregate above. + max_rank: u16, + /// number of signatures in the aggregate above. + cnt: usize, +} + +impl PartialCert { + /// Returns a new instance of [`PartialCert`]. + fn new(max_validator: usize) -> Self { + Self { + signature: SignatureProjective::identity(), + bitvec: BitVec::repeat(false, max_validator), + max_rank: 0, + cnt: 0, + } + } + + /// Returns true if the [`PartialCert`] needs the vote else false. + fn wants_vote(&self, vote: &VoteMessage) -> bool { + match self.bitvec.get(vote.rank as usize) { + None => false, + Some(ind) => !*ind, + } + } + + /// Adds the given [`VoteMessage`] to the aggregate. + fn add_vote(&mut self, vote: &VoteMessage) -> Result<(), AddVoteError> { + match self.bitvec.get_mut(vote.rank as usize) { + None => Err(AddVoteError::InvalidRank), + Some(mut ind) => { + if *ind { + return Err(AddVoteError::Duplicate); + } + self.signature + .aggregate_with(std::iter::once(&vote.signature))?; + *ind = true; + self.max_rank = std::cmp::max(self.max_rank, vote.rank); + self.cnt = self.cnt.saturating_add(1); + Ok(()) + } + } + } + + /// Builds a signature and associated bitmap from the collected votes. + /// + /// Returns Ok(None) if on votes were collected. + fn build_sig_bitmap(&self) -> Result)>, BuildCertError> { + if self.cnt == 0 { + return Ok(None); + } + let mut bitvec = self.bitvec.clone(); + bitvec.resize(self.max_rank as usize, false); + let bitmap = encode_base2(&bitvec).map_err(BuildCertError::Encode)?; + Ok(Some((self.signature.into(), bitmap))) + } +} + +/// Per slot container for storing notar and skip votes for creating rewards certificates. +pub(super) struct Entry { + skip: PartialCert, + /// Notar votes are indexed by block id as different validators may vote for different blocks. + notar: HashMap, + /// Maximum number of validators for the slot this entry is working on. + max_validators: usize, +} + +impl Entry { + /// Creates a new instance of [`Entry`]. + pub(super) fn new(max_validators: usize) -> Self { + Self { + skip: PartialCert::new(max_validators), + // under normal operations, all validators should vote for a single block id, still allocate space for a few more to hopefully avoid allocations. + notar: HashMap::with_capacity(5), + max_validators, + } + } + + /// Returns true if the [`Entry`] needs the vote else false. + pub(super) fn wants_vote(&self, vote: &VoteMessage) -> bool { + match vote.vote { + Vote::Skip(_) => self.skip.wants_vote(vote), + Vote::Notarize(notar) => match self.notar.get(¬ar.block_id) { + None => true, + Some(sub_entry) => sub_entry.wants_vote(vote), + }, + Vote::Finalize(_) + | Vote::NotarizeFallback(_) + | Vote::SkipFallback(_) + | Vote::Genesis(_) => false, + } + } + + /// Adds the given [`VoteMessage`] to the aggregate. + pub(super) fn add_vote(&mut self, vote: &VoteMessage) -> Result<(), AddVoteError> { + match vote.vote { + Vote::Notarize(notar) => { + let sub_entry = self + .notar + .entry(notar.block_id) + .or_insert(PartialCert::new(self.max_validators)); + sub_entry.add_vote(vote) + } + Vote::Skip(_) => self.skip.add_vote(vote), + _ => Ok(()), + } + } + + /// Builds skip and notar reward certificates from the collected votes. + pub(super) fn build_certs( + &self, + slot: Slot, + ) -> ( + Result, BuildCertError>, + Result, BuildCertError>, + ) { + let skip = self.skip.build_sig_bitmap().map(|r| { + r.map(|(signature, bitmap)| SkipRewardCertificate { + slot, + signature, + bitmap, + }) + }); + + // we can only submit one notar rewards certificate but different validators may vote for different blocks and we cannot combine notar votes for different blocks together in one cert. + // pick the block_id with most votes. + let mut notar = None; + for (block_id, sub_entry) in &self.notar { + match notar { + None => notar = Some((block_id, sub_entry)), + Some((_, max_sub_entry)) => { + if sub_entry.cnt > max_sub_entry.cnt { + notar = Some((block_id, sub_entry)); + } + } + } + } + let notar = match notar { + None => Ok(None), + Some((block_id, sub_entry)) => sub_entry.build_sig_bitmap().map(|r| { + r.map(|(signature, bitmap)| NotarRewardCertificate { + slot, + block_id: *block_id, + signature, + bitmap, + }) + }), + }; + (skip, notar) + } +} diff --git a/votor/src/lib.rs b/votor/src/lib.rs index 79cfc4c972..3a784de5a8 100644 --- a/votor/src/lib.rs +++ b/votor/src/lib.rs @@ -5,6 +5,7 @@ pub mod common; pub mod consensus_metrics; pub mod consensus_pool; mod consensus_pool_service; +pub mod consensus_rewards; pub mod event; mod event_handler; pub mod root_utils; diff --git a/votor/src/votor.rs b/votor/src/votor.rs index f0632bfbc0..c40e71eeb6 100644 --- a/votor/src/votor.rs +++ b/votor/src/votor.rs @@ -47,6 +47,10 @@ use { ConsensusMetrics, ConsensusMetricsEventReceiver, ConsensusMetricsEventSender, }, consensus_pool_service::{ConsensusPoolContext, ConsensusPoolService}, + consensus_rewards::{ + AddVoteMessage, BuildRewardCertsRequest, BuildRewardCertsResponse, + ConsensusRewardsService, + }, event::{LeaderWindowInfo, VotorEventReceiver, VotorEventSender}, event_handler::{EventHandler, EventHandlerContext}, root_utils::RootContext, @@ -111,11 +115,14 @@ pub struct VotorConfig { pub highest_parent_ready: Arc>, pub event_sender: VotorEventSender, pub own_vote_sender: Sender, + pub reward_certs_sender: Sender, // Receivers pub event_receiver: VotorEventReceiver, pub consensus_message_receiver: Receiver, pub consensus_metrics_receiver: ConsensusMetricsEventReceiver, + pub reward_votes_receiver: Receiver, + pub build_reward_certs_receiver: Receiver, } /// Context shared with block creation, replay, gossip, banking stage etc @@ -133,6 +140,7 @@ pub struct Votor { event_handler: EventHandler, consensus_pool_service: ConsensusPoolService, timer_manager: Arc>, + consensus_rewards_service: ConsensusRewardsService, } impl Votor { @@ -164,6 +172,9 @@ impl Votor { consensus_message_receiver: bls_receiver, consensus_metrics_sender, consensus_metrics_receiver, + reward_certs_sender, + build_reward_certs_receiver, + reward_votes_receiver, } = config; let identity_keypair = cluster_info.keypair().clone(); @@ -229,7 +240,7 @@ impl Votor { my_vote_pubkey: vote_account, blockstore, sharable_banks, - leader_schedule_cache, + leader_schedule_cache: leader_schedule_cache.clone(), consensus_message_receiver: bls_receiver, bls_sender, event_sender, @@ -240,15 +251,26 @@ impl Votor { let event_handler = EventHandler::new(event_handler_context); let consensus_pool_service = ConsensusPoolService::new(consensus_pool_context); + let consensus_rewards_service = ConsensusRewardsService::new( + cluster_info, + leader_schedule_cache, + reward_votes_receiver, + build_reward_certs_receiver, + reward_certs_sender, + exit, + ); + Self { event_handler, consensus_pool_service, timer_manager, + consensus_rewards_service, } } pub fn join(self) -> thread::Result<()> { self.consensus_pool_service.join()?; + self.consensus_rewards_service.join()?; // Loop till we manage to unwrap the Arc and then we can join. let mut timer_manager = self.timer_manager;