Skip to content

Commit cfe8f88

Browse files
committed
wip
1 parent c3d4975 commit cfe8f88

File tree

10 files changed

+231
-41
lines changed

10 files changed

+231
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/block_creation_loop.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ use {
3131
bank_forks::BankForks,
3232
},
3333
solana_version::version,
34-
solana_votor::{common::block_timeout, event::LeaderWindowInfo, votor::LeaderWindowNotifier},
35-
solana_votor_messages::migration::MigrationStatus,
34+
solana_votor::{
35+
common::block_timeout, consensus_rewards::ConsensusRewards, event::LeaderWindowInfo,
36+
votor::LeaderWindowNotifier,
37+
},
38+
solana_votor_messages::{migration::MigrationStatus, rewards_certificate::RewardsCertificates},
3639
stats::{BlockCreationLoopMetrics, SlotMetrics},
3740
std::{
3841
sync::{
@@ -104,6 +107,7 @@ struct LeaderContext {
104107
slot_status_notifier: Option<SlotStatusNotifier>,
105108
banking_tracer: Arc<BankingTracer>,
106109
replay_highest_frozen: Arc<ReplayHighestFrozen>,
110+
consensus_rewards: ConsensusRewards,
107111

108112
// Metrics
109113
metrics: BlockCreationLoopMetrics,
@@ -134,9 +138,14 @@ enum StartLeaderError {
134138
),
135139
}
136140

137-
fn produce_block_footer(block_producer_time_nanos: u64) -> VersionedBlockMarker {
141+
fn produce_block_footer(
142+
block_producer_time_nanos: u64,
143+
rewards_certs: RewardsCertificates,
144+
) -> VersionedBlockMarker {
138145
let footer = BlockFooterV1 {
139146
block_producer_time_nanos,
147+
skip_rewards_certificate: rewards_certs.skip,
148+
notar_rewards_certificate: rewards_certs.notar,
140149
block_user_agent: format!("agave/{}", version!()).into_bytes(),
141150
};
142151

@@ -181,14 +190,15 @@ fn start_loop(config: BlockCreationLoopConfig) {
181190
blockstore,
182191
poh_recorder: poh_recorder.clone(),
183192
record_receiver,
184-
leader_schedule_cache,
193+
leader_schedule_cache: leader_schedule_cache.clone(),
185194
bank_forks,
186195
rpc_subscriptions,
187196
slot_status_notifier,
188197
banking_tracer,
189198
replay_highest_frozen,
190199
metrics: BlockCreationLoopMetrics::default(),
191200
slot_metrics: SlotMetrics::default(),
201+
consensus_rewards: ConsensusRewards::new(cluster_info.clone(), leader_schedule_cache),
192202
};
193203

194204
info!("{my_pubkey}: Block creation loop initialized");
@@ -301,6 +311,7 @@ fn produce_window(
301311
if let Err(e) = record_and_complete_block(
302312
ctx.poh_recorder.as_ref(),
303313
&mut ctx.record_receiver,
314+
&mut ctx.consensus_rewards,
304315
skip_timer,
305316
timeout,
306317
) {
@@ -342,6 +353,7 @@ fn produce_window(
342353
fn record_and_complete_block(
343354
poh_recorder: &RwLock<PohRecorder>,
344355
record_receiver: &mut RecordReceiver,
356+
consensus_rewards: &mut ConsensusRewards,
345357
block_timer: Instant,
346358
block_timeout: Duration,
347359
) -> Result<(), PohRecorderError> {
@@ -378,7 +390,9 @@ fn record_and_complete_block(
378390
// Construct and send the block footer
379391
let mut w_poh_recorder = poh_recorder.write().unwrap();
380392
let block_producer_time_nanos = w_poh_recorder.working_bank_block_producer_time_nanos();
381-
let footer = produce_block_footer(block_producer_time_nanos);
393+
// XXX: how to look up the slot.
394+
let rewards_certs = consensus_rewards.build_rewards_certs(u64::MAX);
395+
let footer = produce_block_footer(block_producer_time_nanos, rewards_certs);
382396
w_poh_recorder.send_marker(footer)?;
383397

384398
// Alpentick and clear bank

core/src/bls_sigverify/bls_sigverifier.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ use {
2323
solana_runtime::{bank::Bank, bank_forks::SharableBanks, epoch_stakes::BLSPubkeyToRankMap},
2424
solana_signer_store::{decode, DecodeError},
2525
solana_streamer::packet::PacketBatch,
26-
solana_votor::consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender},
26+
solana_votor::{
27+
consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender},
28+
consensus_rewards::ConsensusRewards,
29+
},
2730
solana_votor_messages::{
2831
consensus_message::{Certificate, CertificateType, ConsensusMessage, VoteMessage},
2932
vote::Vote,
@@ -92,6 +95,7 @@ pub struct BLSSigVerifier {
9295
consensus_metrics_sender: ConsensusMetricsEventSender,
9396
last_checked_root_slot: Slot,
9497
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
98+
consensus_rewards: ConsensusRewards,
9599
}
96100

97101
impl BLSSigVerifier {
@@ -172,6 +176,19 @@ impl BLSSigVerifier {
172176
id: *solana_pubkey,
173177
vote: vote_message.vote,
174178
});
179+
180+
// XXX: this has to be more complicated.
181+
// Some votes that the rewards container wants will get verified normally below so we can have check if the rewards container wants it there and then send it a copy.
182+
// For other votes, we check if the rewards container wants it, then we should verify them in some low priority context and then send them to the container.
183+
if self
184+
.consensus_rewards
185+
.wants_vote(root_bank.slot(), &vote_message)
186+
{
187+
// XXX: actually verify the vote.
188+
self.consensus_rewards
189+
.add_vote_message(root_bank.slot(), vote_message.clone());
190+
}
191+
175192
// Only need votes newer than root slot
176193
if vote_message.vote.slot() <= root_bank.slot() {
177194
self.stats.received_old.fetch_add(1, Ordering::Relaxed);
@@ -248,6 +265,7 @@ impl BLSSigVerifier {
248265
message_sender: Sender<ConsensusMessage>,
249266
consensus_metrics_sender: ConsensusMetricsEventSender,
250267
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
268+
consensus_rewards: ConsensusRewards,
251269
) -> Self {
252270
Self {
253271
sharable_banks,
@@ -259,6 +277,7 @@ impl BLSSigVerifier {
259277
consensus_metrics_sender,
260278
last_checked_root_slot: 0,
261279
alpenglow_last_voted,
280+
consensus_rewards,
262281
}
263282
}
264283

core/src/tvu.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use {
6060
},
6161
solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpSender},
6262
solana_votor::{
63+
consensus_rewards::ConsensusRewards,
6364
event::{VotorEventReceiver, VotorEventSender},
6465
vote_history::VoteHistory,
6566
vote_history_storage::VoteHistoryStorage,
@@ -268,12 +269,15 @@ impl Tvu {
268269
.unwrap();
269270
let alpenglow_sigverify_service = {
270271
let sharable_banks = bank_forks.read().unwrap().sharable_banks();
272+
let consensus_rewards =
273+
ConsensusRewards::new(cluster_info.clone(), leader_schedule_cache.clone());
271274
let verifier = BLSSigVerifier::new(
272275
sharable_banks,
273276
verified_vote_sender.clone(),
274277
consensus_message_sender.clone(),
275278
consensus_metrics_sender.clone(),
276279
alpenglow_last_voted.clone(),
280+
consensus_rewards,
277281
);
278282
BLSSigverifyService::new(bls_packet_receiver, verifier)
279283
};

entry/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ solana-runtime-transaction = { workspace = true }
3939
solana-sha256-hasher = { workspace = true }
4040
solana-transaction = { workspace = true }
4141
solana-transaction-error = { workspace = true }
42+
solana-votor-messages = { workspace = true }
4243
thiserror = { workspace = true }
4344

4445
[dev-dependencies]

entry/src/block_component.rs

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ use {
8888
},
8989
solana_clock::Slot,
9090
solana_hash::Hash,
91+
solana_votor_messages::rewards_certificate::RewardsCertificate,
9192
std::{error::Error, fmt},
9293
};
9394

@@ -269,6 +270,8 @@ pub enum VersionedBlockFooter {
269270
#[derive(Clone, PartialEq, Eq, Debug)]
270271
pub struct BlockFooterV1 {
271272
pub block_producer_time_nanos: u64,
273+
pub skip_rewards_certificate: RewardsCertificate,
274+
pub notar_rewards_certificate: RewardsCertificate,
272275
pub block_user_agent: Vec<u8>,
273276
}
274277

@@ -924,47 +927,49 @@ impl BlockFooterV1 {
924927

925928
/// Serializes to bytes with user agent length capping.
926929
fn to_bytes(&self) -> Result<Vec<u8>, BlockComponentError> {
927-
let mut buffer =
928-
Vec::with_capacity(8 + 1 + self.block_user_agent.len().min(Self::MAX_USER_AGENT_LEN));
930+
unimplemented!()
931+
// let mut buffer =
932+
// Vec::with_capacity(8 + 1 + self.block_user_agent.len().min(Self::MAX_USER_AGENT_LEN));
929933

930-
// Serialize timestamp
931-
buffer.extend_from_slice(&self.block_producer_time_nanos.to_le_bytes());
934+
// // Serialize timestamp
935+
// buffer.extend_from_slice(&self.block_producer_time_nanos.to_le_bytes());
932936

933-
// Serialize user agent with length capping
934-
let capped_len = self.block_user_agent.len().min(Self::MAX_USER_AGENT_LEN);
935-
buffer.push(capped_len as u8);
936-
buffer.extend_from_slice(&self.block_user_agent[..capped_len]);
937+
// // Serialize user agent with length capping
938+
// let capped_len = self.block_user_agent.len().min(Self::MAX_USER_AGENT_LEN);
939+
// buffer.push(capped_len as u8);
940+
// buffer.extend_from_slice(&self.block_user_agent[..capped_len]);
937941

938-
Ok(buffer)
942+
// Ok(buffer)
939943
}
940944

941945
/// Deserializes from bytes with validation.
942-
fn from_bytes(data: &[u8]) -> Result<Self, BlockComponentError> {
943-
if data.len() < Self::HEADER_SIZE {
944-
return Err(BlockComponentError::InsufficientData);
945-
}
946-
947-
// Read timestamp
948-
// Unwrap: HEADER_SIZE = TIMESTAMP_SIZE + USER_AGENT_LEN_SIZE > TIMESTAMP_SIZE, so this will
949-
// never fail.
950-
let time_bytes = data[..Self::TIMESTAMP_SIZE].try_into().unwrap();
951-
let block_producer_time_nanos = u64::from_le_bytes(time_bytes);
952-
953-
// Read user agent length
954-
let user_agent_len = data[Self::TIMESTAMP_SIZE] as usize;
955-
956-
// Validate remaining data size
957-
if data.len() < Self::HEADER_SIZE + user_agent_len {
958-
return Err(BlockComponentError::InsufficientData);
959-
}
960-
961-
// Read user agent bytes
962-
let block_user_agent = data[Self::HEADER_SIZE..Self::HEADER_SIZE + user_agent_len].to_vec();
963-
964-
Ok(Self {
965-
block_producer_time_nanos,
966-
block_user_agent,
967-
})
946+
fn from_bytes(_data: &[u8]) -> Result<Self, BlockComponentError> {
947+
unimplemented!()
948+
// if data.len() < Self::HEADER_SIZE {
949+
// return Err(BlockComponentError::InsufficientData);
950+
// }
951+
952+
// // Read timestamp
953+
// // Unwrap: HEADER_SIZE = TIMESTAMP_SIZE + USER_AGENT_LEN_SIZE > TIMESTAMP_SIZE, so this will
954+
// // never fail.
955+
// let time_bytes = data[..Self::TIMESTAMP_SIZE].try_into().unwrap();
956+
// let block_producer_time_nanos = u64::from_le_bytes(time_bytes);
957+
958+
// // Read user agent length
959+
// let user_agent_len = data[Self::TIMESTAMP_SIZE] as usize;
960+
961+
// // Validate remaining data size
962+
// if data.len() < Self::HEADER_SIZE + user_agent_len {
963+
// return Err(BlockComponentError::InsufficientData);
964+
// }
965+
966+
// // Read user agent bytes
967+
// let block_user_agent = data[Self::HEADER_SIZE..Self::HEADER_SIZE + user_agent_len].to_vec();
968+
969+
// Ok(Self {
970+
// block_producer_time_nanos,
971+
// block_user_agent,
972+
// })
968973
}
969974

970975
/// Returns the serialized size in bytes without actually serializing.

votor-messages/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
pub mod consensus_message;
66
pub mod migration;
7+
pub mod rewards_certificate;
78
pub mod vote;
89

910
#[cfg_attr(feature = "frozen-abi", macro_use)]
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
//! Defines aggregates used for rewards.
2+
3+
use {solana_bls_signatures::Signature as BLSSignature, solana_clock::Slot};
4+
5+
/// The reward certificates that will be included in the block footer.
6+
pub struct RewardsCertificates {
7+
/// The notar rewards certificate.
8+
pub notar: RewardsCertificate,
9+
/// The skip rewards certificate.
10+
pub skip: RewardsCertificate,
11+
}
12+
13+
/// Different types of rewards aggregates.
14+
#[derive(Clone, PartialEq, Eq, Debug)]
15+
pub enum RewardsCertificateType {
16+
/// validator voted skip.
17+
Skip(Slot),
18+
/// validator voted notar.
19+
Notar(Slot),
20+
}
21+
22+
/// An aggregate used to pay rewards.
23+
#[derive(Clone, PartialEq, Eq, Debug)]
24+
pub struct RewardsCertificate {
25+
/// The type of the aggregate.
26+
pub cert_type: RewardsCertificateType,
27+
/// The signature
28+
pub signature: BLSSignature,
29+
/// The bitmap for validators, see solana-signer-store for encoding format
30+
pub bitmap: Vec<u8>,
31+
}

0 commit comments

Comments
 (0)