Skip to content

Commit f10419e

Browse files
committed
wip
1 parent c3d4975 commit f10419e

File tree

13 files changed

+382
-46
lines changed

13 files changed

+382
-46
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ lru = { workspace = true }
7070
min-max-heap = { workspace = true }
7171
num_cpus = { workspace = true }
7272
num_enum = { workspace = true }
73+
parking_lot = { workspace = true }
7374
prio-graph = { workspace = true }
7475
qualifier_attr = { workspace = true }
7576
quinn = { workspace = true }

core/src/block_creation_loop.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use {
99
banking_trace::BankingTracer,
1010
replay_stage::{Finalizer, ReplayStage},
1111
},
12+
parking_lot::RwLock as PLRwLock,
1213
solana_clock::Slot,
1314
solana_entry::block_component::{
1415
BlockFooterV1, BlockMarkerV1, VersionedBlockFooter, VersionedBlockMarker,
@@ -31,8 +32,14 @@ use {
3132
bank_forks::BankForks,
3233
},
3334
solana_version::version,
34-
solana_votor::{common::block_timeout, event::LeaderWindowInfo, votor::LeaderWindowNotifier},
35-
solana_votor_messages::migration::MigrationStatus,
35+
solana_votor::{
36+
common::block_timeout, consensus_rewards::ConsensusRewards, event::LeaderWindowInfo,
37+
votor::LeaderWindowNotifier,
38+
},
39+
solana_votor_messages::{
40+
migration::MigrationStatus,
41+
rewards_certificate::{NotarRewardCertificate, SkipRewardCertificate},
42+
},
3643
stats::{BlockCreationLoopMetrics, SlotMetrics},
3744
std::{
3845
sync::{
@@ -79,6 +86,7 @@ pub struct BlockCreationLoopConfig {
7986
pub poh_recorder: Arc<RwLock<PohRecorder>>,
8087
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
8188
pub rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
89+
pub consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
8290

8391
// Notifiers
8492
pub banking_tracer: Arc<BankingTracer>,
@@ -104,6 +112,7 @@ struct LeaderContext {
104112
slot_status_notifier: Option<SlotStatusNotifier>,
105113
banking_tracer: Arc<BankingTracer>,
106114
replay_highest_frozen: Arc<ReplayHighestFrozen>,
115+
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
107116

108117
// Metrics
109118
metrics: BlockCreationLoopMetrics,
@@ -134,9 +143,15 @@ enum StartLeaderError {
134143
),
135144
}
136145

137-
fn produce_block_footer(block_producer_time_nanos: u64) -> VersionedBlockMarker {
146+
fn produce_block_footer(
147+
block_producer_time_nanos: u64,
148+
skip_reward_certificate: Option<SkipRewardCertificate>,
149+
notar_reward_certificate: Option<NotarRewardCertificate>,
150+
) -> VersionedBlockMarker {
138151
let footer = BlockFooterV1 {
139152
block_producer_time_nanos,
153+
skip_reward_certificate,
154+
notar_reward_certificate,
140155
block_user_agent: format!("agave/{}", version!()).into_bytes(),
141156
};
142157

@@ -166,6 +181,7 @@ fn start_loop(config: BlockCreationLoopConfig) {
166181
leader_window_notifier,
167182
replay_highest_frozen,
168183
migration_status,
184+
consensus_rewards,
169185
} = config;
170186

171187
// Similar to Votor, if this loop dies kill the validator
@@ -189,6 +205,7 @@ fn start_loop(config: BlockCreationLoopConfig) {
189205
replay_highest_frozen,
190206
metrics: BlockCreationLoopMetrics::default(),
191207
slot_metrics: SlotMetrics::default(),
208+
consensus_rewards,
192209
};
193210

194211
info!("{my_pubkey}: Block creation loop initialized");
@@ -301,6 +318,7 @@ fn produce_window(
301318
if let Err(e) = record_and_complete_block(
302319
ctx.poh_recorder.as_ref(),
303320
&mut ctx.record_receiver,
321+
ctx.consensus_rewards.clone(),
304322
skip_timer,
305323
timeout,
306324
) {
@@ -342,9 +360,17 @@ fn produce_window(
342360
fn record_and_complete_block(
343361
poh_recorder: &RwLock<PohRecorder>,
344362
record_receiver: &mut RecordReceiver,
363+
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
345364
block_timer: Instant,
346365
block_timeout: Duration,
347366
) -> Result<(), PohRecorderError> {
367+
// Taking a read lock on consensus_rewards can contend with the write lock in bls_sigverifier.
368+
// We are ready to produce the block footer now, while we gather other bits of data, we can block on the consensus_rewards lock in a separate thread to minimise contention.
369+
let handle = std::thread::spawn(move || {
370+
// XXX: how to look up the slot.
371+
let slot = u64::MAX;
372+
consensus_rewards.read().build_rewards_certs(slot)
373+
});
348374
loop {
349375
let remaining_slot_time = block_timeout.saturating_sub(block_timer.elapsed());
350376
if remaining_slot_time.is_zero() {
@@ -378,7 +404,8 @@ fn record_and_complete_block(
378404
// Construct and send the block footer
379405
let mut w_poh_recorder = poh_recorder.write().unwrap();
380406
let block_producer_time_nanos = w_poh_recorder.working_bank_block_producer_time_nanos();
381-
let footer = produce_block_footer(block_producer_time_nanos);
407+
let (skip, notar) = handle.join().unwrap();
408+
let footer = produce_block_footer(block_producer_time_nanos, skip, notar);
382409
w_poh_recorder.send_marker(footer)?;
383410

384411
// Alpentick and clear bank

core/src/bls_sigverify/bls_sigverifier.rs

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use {
99
},
1010
bitvec::prelude::{BitVec, Lsb0},
1111
crossbeam_channel::{Sender, TrySendError},
12+
parking_lot::RwLock as PLRwLock,
1213
rayon::iter::{
1314
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
1415
},
@@ -23,7 +24,10 @@ use {
2324
solana_runtime::{bank::Bank, bank_forks::SharableBanks, epoch_stakes::BLSPubkeyToRankMap},
2425
solana_signer_store::{decode, DecodeError},
2526
solana_streamer::packet::PacketBatch,
26-
solana_votor::consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender},
27+
solana_votor::{
28+
consensus_metrics::{ConsensusMetricsEvent, ConsensusMetricsEventSender},
29+
consensus_rewards::ConsensusRewards,
30+
},
2731
solana_votor_messages::{
2832
consensus_message::{Certificate, CertificateType, ConsensusMessage, VoteMessage},
2933
vote::Vote,
@@ -92,6 +96,7 @@ pub struct BLSSigVerifier {
9296
consensus_metrics_sender: ConsensusMetricsEventSender,
9397
last_checked_root_slot: Slot,
9498
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
99+
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
95100
}
96101

97102
impl BLSSigVerifier {
@@ -172,8 +177,17 @@ impl BLSSigVerifier {
172177
id: *solana_pubkey,
173178
vote: vote_message.vote,
174179
});
175-
// Only need votes newer than root slot
180+
181+
// consensus pool does not need votes for slots other than root slot however the rewards container may still need them.
176182
if vote_message.vote.slot() <= root_bank.slot() {
183+
// the only module that takes a write lock on consensus_rewards is this one and it does not take the write lock while it is verifying votes so this should never block.
184+
if self
185+
.consensus_rewards
186+
.read()
187+
.wants_vote(root_bank.slot(), &vote_message)
188+
{
189+
// XXX: actually verify and send the votes. The verification and sending should happen off the critical path.
190+
}
177191
self.stats.received_old.fetch_add(1, Ordering::Relaxed);
178192
packet.meta_mut().set_discard(true);
179193
continue;
@@ -219,8 +233,8 @@ impl BLSSigVerifier {
219233
|| self.verify_and_send_certificates(certs_to_verify, &root_bank),
220234
);
221235

222-
votes_result?;
223-
certs_result?;
236+
let rewards_votes = votes_result?;
237+
let () = certs_result?;
224238

225239
// Send to RPC service for last voted tracking
226240
self.alpenglow_last_voted
@@ -235,6 +249,17 @@ impl BLSSigVerifier {
235249
warn!("could not send consensus metrics, receive side of channel is closed");
236250
}
237251

252+
{
253+
// This should be the only place that is taking a write lock on consensus_rewards.
254+
// This lock should not contend with any other operations in this module.
255+
// It can contend with the read lock in the block creation loop though as such we should hold it for as little time as possible.
256+
let mut guard = self.consensus_rewards.write();
257+
let root_slot = root_bank.slot();
258+
for v in rewards_votes {
259+
guard.add_vote_message(root_slot, v);
260+
}
261+
}
262+
238263
self.stats.maybe_report_stats();
239264

240265
Ok(())
@@ -248,6 +273,7 @@ impl BLSSigVerifier {
248273
message_sender: Sender<ConsensusMessage>,
249274
consensus_metrics_sender: ConsensusMetricsEventSender,
250275
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
276+
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
251277
) -> Self {
252278
Self {
253279
sharable_banks,
@@ -259,14 +285,32 @@ impl BLSSigVerifier {
259285
consensus_metrics_sender,
260286
last_checked_root_slot: 0,
261287
alpenglow_last_voted,
288+
consensus_rewards,
262289
}
263290
}
264291

292+
/// Verifies votes and sends verified votes to the consensus pool.
293+
/// Also returns a copy of the verified votes that the rewards container is interested is so that the caller can send them to it.
265294
fn verify_and_send_votes(
266295
&self,
267296
votes_to_verify: Vec<VoteToVerify>,
268-
) -> Result<(), BLSSigVerifyServiceError<ConsensusMessage>> {
297+
) -> Result<Vec<VoteMessage>, BLSSigVerifyServiceError<ConsensusMessage>> {
269298
let verified_votes = self.verify_votes(votes_to_verify);
299+
300+
let rewards_votes = {
301+
// the only module that takes a write lock on consensus_rewards is this one and it does not take the write lock while it is verifying votes so this should never block.
302+
let guard = self.consensus_rewards.read();
303+
let root_slot = self.sharable_banks.root().slot();
304+
verified_votes
305+
.iter()
306+
.filter_map(|vote| {
307+
guard
308+
.wants_vote(root_slot, &vote.vote_message)
309+
.then_some(vote.vote_message.clone())
310+
})
311+
.collect()
312+
};
313+
270314
self.stats
271315
.total_valid_packets
272316
.fetch_add(verified_votes.len() as u64, Ordering::Relaxed);
@@ -285,7 +329,7 @@ impl BLSSigVerifier {
285329
}
286330
}
287331

288-
// Send the BLS vote messaage to certificate pool
332+
// Send the votes to the consensus pool
289333
match self
290334
.message_sender
291335
.try_send(ConsensusMessage::Vote(vote.vote_message))
@@ -319,7 +363,7 @@ impl BLSSigVerifier {
319363
}
320364
}
321365

322-
Ok(())
366+
Ok(rewards_votes)
323367
}
324368

325369
fn verify_votes(&self, votes_to_verify: Vec<VoteToVerify>) -> Vec<VoteToVerify> {

core/src/tvu.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use {
2727
},
2828
bytes::Bytes,
2929
crossbeam_channel::{bounded, unbounded, Receiver, Sender},
30+
parking_lot::RwLock as PLRwLock,
3031
solana_client::connection_cache::ConnectionCache,
3132
solana_clock::Slot,
3233
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
@@ -60,6 +61,7 @@ use {
6061
},
6162
solana_turbine::{retransmit_stage::RetransmitStage, xdp::XdpSender},
6263
solana_votor::{
64+
consensus_rewards::ConsensusRewards,
6365
event::{VotorEventReceiver, VotorEventSender},
6466
vote_history::VoteHistory,
6567
vote_history_storage::VoteHistoryStorage,
@@ -214,6 +216,7 @@ impl Tvu {
214216
key_notifiers: Arc<RwLock<KeyUpdaters>>,
215217
alpenglow_last_voted: Arc<AlpenglowLastVoted>,
216218
migration_status: Arc<MigrationStatus>,
219+
consensus_rewards: Arc<PLRwLock<ConsensusRewards>>,
217220
) -> Result<Self, String> {
218221
let (consensus_message_sender, consensus_message_receiver) =
219222
bounded(MAX_ALPENGLOW_PACKET_NUM);
@@ -274,6 +277,7 @@ impl Tvu {
274277
consensus_message_sender.clone(),
275278
consensus_metrics_sender.clone(),
276279
alpenglow_last_voted.clone(),
280+
consensus_rewards,
277281
);
278282
BLSSigverifyService::new(bls_packet_receiver, verifier)
279283
};

core/src/validator.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use {
3232
},
3333
anyhow::{anyhow, Context, Result},
3434
crossbeam_channel::{bounded, unbounded, Receiver},
35+
parking_lot::RwLock as PLRwLock,
3536
quinn::Endpoint,
3637
solana_accounts_db::{
3738
accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
@@ -142,6 +143,7 @@ use {
142143
solana_validator_exit::Exit,
143144
solana_vote_program::vote_state,
144145
solana_votor::{
146+
consensus_rewards::ConsensusRewards,
145147
vote_history::{VoteHistory, VoteHistoryError},
146148
vote_history_storage::{NullVoteHistoryStorage, VoteHistoryStorage},
147149
voting_service::VotingServiceOverride,
@@ -1425,6 +1427,11 @@ impl Validator {
14251427
migration_status.clone(),
14261428
);
14271429

1430+
let consensus_rewards = Arc::new(PLRwLock::new(ConsensusRewards::new(
1431+
cluster_info.clone(),
1432+
leader_schedule_cache.clone(),
1433+
)));
1434+
14281435
let block_creation_loop_config = BlockCreationLoopConfig {
14291436
exit: exit.clone(),
14301437
migration_status: migration_status.clone(),
@@ -1439,6 +1446,7 @@ impl Validator {
14391446
record_receiver: record_receiver.clone(),
14401447
leader_window_notifier: leader_window_notifier.clone(),
14411448
replay_highest_frozen: replay_highest_frozen.clone(),
1449+
consensus_rewards: consensus_rewards.clone(),
14421450
};
14431451
let block_creation_loop = BlockCreationLoop::new(block_creation_loop_config);
14441452

@@ -1693,6 +1701,7 @@ impl Validator {
16931701
key_notifiers.clone(),
16941702
alpenglow_last_voted.clone(),
16951703
migration_status.clone(),
1704+
consensus_rewards,
16961705
)
16971706
.map_err(ValidatorError::Other)?;
16981707

entry/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ solana-runtime-transaction = { workspace = true }
3939
solana-sha256-hasher = { workspace = true }
4040
solana-transaction = { workspace = true }
4141
solana-transaction-error = { workspace = true }
42+
solana-votor-messages = { workspace = true }
4243
thiserror = { workspace = true }
4344

4445
[dev-dependencies]

0 commit comments

Comments
 (0)