Skip to content
Draft
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 40 additions & 4 deletions core/src/block_creation_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
banking_trace::BankingTracer,
replay_stage::{Finalizer, ReplayStage},
},
crossbeam_channel::Receiver,
crossbeam_channel::{Receiver, Sender},
solana_clock::Slot,
solana_entry::block_component::{
BlockFooterV1, BlockMarkerV1, GenesisCertificate, VersionedBlockMarker,
Expand All @@ -34,7 +34,12 @@ use {
block_component_processor::BlockComponentProcessor,
},
solana_version::version,
solana_votor::{common::block_timeout, event::LeaderWindowInfo},
solana_votor::{
common::block_timeout,
consensus_rewards::{BuildRewardCertsRequest, BuildRewardCertsResponse},
event::LeaderWindowInfo,
},
solana_votor_messages::rewards_certificate::{NotarRewardCertificate, SkipRewardCertificate},
stats::{BlockCreationLoopMetrics, SlotMetrics},
std::{
sync::{
Expand Down Expand Up @@ -93,6 +98,9 @@ pub struct BlockCreationLoopConfig {
// Channel to receive RecordReceiver from PohService
pub record_receiver_receiver: Receiver<RecordReceiver>,
pub optimistic_parent_receiver: Receiver<LeaderWindowInfo>,

pub build_reward_certs_sender: Sender<BuildRewardCertsRequest>,
pub reward_certs_receiver: Receiver<BuildRewardCertsResponse>,
}

struct LeaderContext {
Expand All @@ -110,6 +118,8 @@ struct LeaderContext {
slot_status_notifier: Option<SlotStatusNotifier>,
banking_tracer: Arc<BankingTracer>,
replay_highest_frozen: Arc<ReplayHighestFrozen>,
build_reward_certs_sender: Sender<BuildRewardCertsRequest>,
reward_certs_receiver: Receiver<BuildRewardCertsResponse>,

// Metrics
metrics: BlockCreationLoopMetrics,
Expand Down Expand Up @@ -164,6 +174,8 @@ fn start_loop(config: BlockCreationLoopConfig) {
replay_highest_frozen,
highest_parent_ready,
optimistic_parent_receiver,
build_reward_certs_sender,
reward_certs_receiver,
} = config;

// Similar to Votor, if this loop dies kill the validator
Expand Down Expand Up @@ -210,6 +222,8 @@ fn start_loop(config: BlockCreationLoopConfig) {
replay_highest_frozen,
metrics: BlockCreationLoopMetrics::default(),
slot_metrics: SlotMetrics::default(),
build_reward_certs_sender,
reward_certs_receiver,
genesis_cert,
};

Expand Down Expand Up @@ -331,6 +345,8 @@ fn produce_window(
if let Err(e) = record_and_complete_block(
ctx.poh_recorder.as_ref(),
&mut ctx.record_receiver,
&ctx.build_reward_certs_sender,
&ctx.reward_certs_receiver,
skip_timer,
timeout,
) {
Expand Down Expand Up @@ -384,7 +400,11 @@ fn skew_block_producer_time_nanos(

/// Produces a block footer with the current timestamp and version information.
/// The bank_hash field is left as default and will be filled in after the bank freezes.
fn produce_block_footer(bank: Arc<Bank>) -> BlockFooterV1 {
fn produce_block_footer(
bank: Arc<Bank>,
skip_reward_certificate: Option<SkipRewardCertificate>,
notar_reward_certificate: Option<NotarRewardCertificate>,
) -> BlockFooterV1 {
let mut block_producer_time_nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Misconfigured system clock; couldn't measure block producer time.")
Expand All @@ -411,6 +431,8 @@ fn produce_block_footer(bank: Arc<Bank>) -> BlockFooterV1 {
bank_hash: Hash::default(),
block_producer_time_nanos: block_producer_time_nanos as u64,
block_user_agent: format!("agave/{}", version!()).into_bytes(),
skip_reward_certificate,
notar_reward_certificate,
}
}

Expand All @@ -424,9 +446,17 @@ fn produce_block_footer(bank: Arc<Bank>) -> BlockFooterV1 {
fn record_and_complete_block(
poh_recorder: &RwLock<PohRecorder>,
record_receiver: &mut RecordReceiver,
build_reward_certs_sender: &Sender<BuildRewardCertsRequest>,
cert_receiver: &Receiver<BuildRewardCertsResponse>,
block_timer: Instant,
block_timeout: Duration,
) -> Result<(), PohRecorderError> {
// XXX: how to look up the slot.
let slot = u64::MAX;
// XXX: handle error below.
build_reward_certs_sender
.send(BuildRewardCertsRequest { slot })
.unwrap();
loop {
let remaining_slot_time = block_timeout.saturating_sub(block_timer.elapsed());
if remaining_slot_time.is_zero() {
Expand Down Expand Up @@ -477,7 +507,13 @@ fn record_and_complete_block(

// Produce the footer with the current timestamp
let working_bank = w_poh_recorder.working_bank().unwrap();
let footer = produce_block_footer(working_bank.bank.clone_without_scheduler());
/// XXX: handle error below
let resp = cert_receiver.recv().unwrap();
let footer = produce_block_footer(
working_bank.bank.clone_without_scheduler(),
resp.skip,
resp.notar,
);

BlockComponentProcessor::update_bank_with_footer(
working_bank.bank.clone_without_scheduler(),
Expand Down
70 changes: 61 additions & 9 deletions core/src/bls_sigverify/bls_sigverifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ use {
signature::SignatureProjective,
},
solana_clock::Slot,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::leader_schedule_cache::LeaderScheduleCache,
solana_measure::measure::Measure,
solana_pubkey::Pubkey,
solana_rpc::alpenglow_last_voted::AlpenglowLastVoted,
solana_runtime::{bank::Bank, bank_forks::SharableBanks, epoch_stakes::BLSPubkeyToRankMap},
solana_streamer::packet::PacketBatch,
solana_votor::consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender},
solana_votor::{
consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender},
consensus_rewards::{self, AddVoteEntry, AddVoteMessage},
},
solana_votor_messages::{
consensus_message::{Certificate, CertificateType, ConsensusMessage, VoteMessage},
vote::Vote,
Expand Down Expand Up @@ -60,6 +65,7 @@ enum CertVerifyError {

pub struct BLSSigVerifier {
verified_votes_sender: VerifiedVoteSender,
reward_votes_sender: Sender<AddVoteMessage>,
message_sender: Sender<ConsensusMessage>,
sharable_banks: SharableBanks,
stats: BLSSigVerifierStats,
Expand All @@ -68,6 +74,8 @@ pub struct BLSSigVerifier {
consensus_metrics_sender: ConsensusMetricsEventSender,
last_checked_root_slot: Slot,
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
cluster_info: Arc<ClusterInfo>,
leader_schedule: Arc<LeaderScheduleCache>,
}

impl BLSSigVerifier {
Expand Down Expand Up @@ -148,8 +156,16 @@ impl BLSSigVerifier {
id: *solana_pubkey,
vote: vote_message.vote,
});
// Only need votes newer than root slot
if vote_message.vote.slot() <= root_bank.slot() {

// consensus pool does not need votes for slots older than root slot however the rewards container may still need them.
if vote_message.vote.slot() <= root_bank.slot()
&& !consensus_rewards::wants_vote(
&self.cluster_info,
&self.leader_schedule,
root_bank.slot(),
&vote_message,
)
{
self.stats.received_old.fetch_add(1, Ordering::Relaxed);
packet.meta_mut().set_discard(true);
continue;
Expand Down Expand Up @@ -191,12 +207,12 @@ impl BLSSigVerifier {
.fetch_add(preprocess_time.as_us(), Ordering::Relaxed);

let (votes_result, certs_result) = rayon::join(
|| self.verify_and_send_votes(votes_to_verify),
|| self.verify_and_send_votes(votes_to_verify, &root_bank),
|| self.verify_and_send_certificates(certs_to_verify, &root_bank),
);

votes_result?;
certs_result?;
let add_vote_msg = votes_result?;
let () = certs_result?;

// Send to RPC service for last voted tracking
self.alpenglow_last_voted
Expand All @@ -211,6 +227,10 @@ impl BLSSigVerifier {
warn!("could not send consensus metrics, receive side of channel is closed");
}

if self.reward_votes_sender.send(add_vote_msg).is_err() {
warn!("could not send votes to reward container, receive side of channel is closed");
}

self.stats.maybe_report_stats();

Ok(())
Expand All @@ -221,28 +241,60 @@ impl BLSSigVerifier {
pub fn new(
sharable_banks: SharableBanks,
verified_votes_sender: VerifiedVoteSender,
reward_votes_sender: Sender<AddVoteMessage>,
message_sender: Sender<ConsensusMessage>,
consensus_metrics_sender: ConsensusMetricsEventSender,
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
cluster_info: Arc<ClusterInfo>,
leader_schedule: Arc<LeaderScheduleCache>,
) -> Self {
Self {
sharable_banks,
verified_votes_sender,
reward_votes_sender,
message_sender,
stats: BLSSigVerifierStats::new(),
verified_certs: RwLock::new(HashSet::new()),
vote_payload_cache: RwLock::new(HashMap::new()),
consensus_metrics_sender,
last_checked_root_slot: 0,
alpenglow_last_voted,
cluster_info,
leader_schedule,
}
}

/// Verifies votes and sends verified votes to the consensus pool.
/// Also returns a copy of the verified votes that the rewards container is interested is so that the caller can send them to it.
fn verify_and_send_votes(
&self,
votes_to_verify: Vec<VoteToVerify>,
) -> Result<(), BLSSigVerifyServiceError<ConsensusMessage>> {
root_bank: &Bank,
) -> Result<AddVoteMessage, BLSSigVerifyServiceError<ConsensusMessage>> {
let verified_votes = self.verify_votes(votes_to_verify);

let reward_votes = verified_votes
.iter()
.filter_map(|v| {
let vote = v.vote_message;
let rank_map = get_key_to_rank_map(root_bank, vote.vote.slot())?;
consensus_rewards::wants_vote(
&self.cluster_info,
&self.leader_schedule,
root_bank.slot(),
&vote,
)
.then_some(AddVoteEntry {
max_validators: rank_map.len(),
vote,
})
})
.collect();
let add_vote_msg = AddVoteMessage {
root_slot: root_bank.slot(),
votes: reward_votes,
};

self.stats
.total_valid_packets
.fetch_add(verified_votes.len() as u64, Ordering::Relaxed);
Expand All @@ -261,7 +313,7 @@ impl BLSSigVerifier {
}
}

// Send the BLS vote messaage to certificate pool
// Send the votes to the consensus pool
match self
.message_sender
.try_send(ConsensusMessage::Vote(vote.vote_message))
Expand Down Expand Up @@ -295,7 +347,7 @@ impl BLSSigVerifier {
}
}

Ok(())
Ok(add_vote_msg)
}

fn verify_votes(&self, votes_to_verify: Vec<VoteToVerify>) -> Vec<VoteToVerify> {
Expand Down
10 changes: 10 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use {
solana_vote::vote_transaction::VoteTransaction,
solana_votor::{
consensus_metrics::{ConsensusMetricsEventReceiver, ConsensusMetricsEventSender},
consensus_rewards::{AddVoteMessage, BuildRewardCertsRequest, BuildRewardCertsResponse},
event::{
CompletedBlock, LeaderWindowInfo, VotorEvent, VotorEventReceiver, VotorEventSender,
},
Expand Down Expand Up @@ -299,6 +300,9 @@ pub struct ReplayStageConfig {
pub consensus_metrics_sender: ConsensusMetricsEventSender,
pub consensus_metrics_receiver: ConsensusMetricsEventReceiver,
pub migration_status: Arc<MigrationStatus>,
pub reward_certs_sender: Sender<BuildRewardCertsResponse>,
pub reward_votes_receiver: Receiver<AddVoteMessage>,
pub build_reward_certs_receiver: Receiver<BuildRewardCertsRequest>,
}

pub struct ReplaySenders {
Expand Down Expand Up @@ -620,6 +624,9 @@ impl ReplayStage {
consensus_metrics_sender,
consensus_metrics_receiver,
migration_status,
reward_certs_sender,
reward_votes_receiver,
build_reward_certs_receiver,
} = config;

let ReplaySenders {
Expand Down Expand Up @@ -694,6 +701,9 @@ impl ReplayStage {
consensus_metrics_sender,
consensus_metrics_receiver,
migration_status: migration_status.clone(),
reward_certs_sender,
reward_votes_receiver,
build_reward_certs_receiver,
};
let votor = Votor::new(votor_config);

Expand Down
10 changes: 10 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use {
},
solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpSender},
solana_votor::{
consensus_rewards::{BuildRewardCertsRequest, BuildRewardCertsResponse},
event::{LeaderWindowInfo, VotorEventReceiver, VotorEventSender},
vote_history::VoteHistory,
vote_history_storage::VoteHistoryStorage,
Expand Down Expand Up @@ -216,6 +217,8 @@ impl Tvu {
key_notifiers: Arc<RwLock<KeyUpdaters>>,
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
migration_status: Arc<MigrationStatus>,
reward_certs_sender: Sender<BuildRewardCertsResponse>,
build_reward_certs_receiver: Receiver<BuildRewardCertsRequest>,
) -> Result<Self, String> {
let (consensus_message_sender, consensus_message_receiver) =
bounded(MAX_ALPENGLOW_PACKET_NUM);
Expand Down Expand Up @@ -268,14 +271,18 @@ impl Tvu {
alpenglow_quic_server_config,
)
.unwrap();
let (reward_votes_sender, reward_votes_receiver) = unbounded();
let alpenglow_sigverify_service = {
let sharable_banks = bank_forks.read().unwrap().sharable_banks();
let verifier = BLSSigVerifier::new(
sharable_banks,
verified_vote_sender.clone(),
reward_votes_sender,
consensus_message_sender.clone(),
consensus_metrics_sender.clone(),
alpenglow_last_voted.clone(),
cluster_info.clone(),
leader_schedule_cache.clone(),
);
BLSSigverifyService::new(bls_packet_receiver, verifier)
};
Expand Down Expand Up @@ -449,6 +456,9 @@ impl Tvu {
consensus_metrics_sender: consensus_metrics_sender.clone(),
consensus_metrics_receiver,
migration_status,
reward_certs_sender,
reward_votes_receiver,
build_reward_certs_receiver,
};

let voting_service = VotingService::new(
Expand Down
Loading