Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 80316b0

Browse files
committed
Make tpu own banking tracer thread
1 parent 11b0004 commit 80316b0

File tree

4 files changed

+17
-24
lines changed

4 files changed

+17
-24
lines changed

banking-bench/src/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,6 @@ fn main() {
449449
None,
450450
Arc::new(connection_cache),
451451
bank_forks.clone(),
452-
banking_tracer,
453452
);
454453
poh_recorder.write().unwrap().set_bank(&bank, false);
455454

core/benches/banking_stage.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
289289
None,
290290
Arc::new(ConnectionCache::default()),
291291
bank_forks,
292-
banking_tracer,
293292
);
294293
poh_recorder.write().unwrap().set_bank(&bank, false);
295294

core/src/banking_stage.rs

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
55
use {
66
crate::{
7-
banking_trace::{BankingPacketReceiver, BankingTracer, TracerThread},
7+
banking_trace::BankingPacketReceiver,
88
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
99
immutable_deserialized_packet::ImmutableDeserializedPacket,
1010
latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource},
@@ -346,7 +346,6 @@ pub struct BatchedTransactionErrorDetails {
346346
/// Stores the stage's thread handle and output receiver.
347347
pub struct BankingStage {
348348
bank_thread_hdls: Vec<JoinHandle<()>>,
349-
tracer_thread_hdl: TracerThread,
350349
}
351350

352351
#[derive(Debug, Clone)]
@@ -387,7 +386,6 @@ impl BankingStage {
387386
log_messages_bytes_limit: Option<usize>,
388387
connection_cache: Arc<ConnectionCache>,
389388
bank_forks: Arc<RwLock<BankForks>>,
390-
banking_tracer: Arc<BankingTracer>,
391389
) -> Self {
392390
Self::new_num_threads(
393391
cluster_info,
@@ -401,7 +399,6 @@ impl BankingStage {
401399
log_messages_bytes_limit,
402400
connection_cache,
403401
bank_forks,
404-
banking_tracer,
405402
)
406403
}
407404

@@ -418,7 +415,6 @@ impl BankingStage {
418415
log_messages_bytes_limit: Option<usize>,
419416
connection_cache: Arc<ConnectionCache>,
420417
bank_forks: Arc<RwLock<BankForks>>,
421-
banking_tracer: Arc<BankingTracer>,
422418
) -> Self {
423419
assert!(num_threads >= MIN_TOTAL_THREADS);
424420
// Single thread to generate entries from many banks.
@@ -510,10 +506,7 @@ impl BankingStage {
510506
})
511507
.collect();
512508

513-
Self {
514-
bank_thread_hdls,
515-
tracer_thread_hdl: banking_tracer.take_tracer_thread_join_handle(),
516-
}
509+
Self { bank_thread_hdls }
517510
}
518511

519512
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
@@ -1908,11 +1901,6 @@ impl BankingStage {
19081901
for bank_thread_hdl in self.bank_thread_hdls {
19091902
bank_thread_hdl.join()?;
19101903
}
1911-
if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
1912-
if let Err(tracer_result) = tracer_thread_hdl.join()? {
1913-
error!("tracer thread error: {:?}", tracer_result);
1914-
}
1915-
}
19161904
Ok(())
19171905
}
19181906
}
@@ -1963,7 +1951,10 @@ where
19631951
mod tests {
19641952
use {
19651953
super::*,
1966-
crate::{banking_trace::BankingPacketBatch, unprocessed_packet_batches},
1954+
crate::{
1955+
banking_trace::{BankingPacketBatch, BankingTracer},
1956+
unprocessed_packet_batches,
1957+
},
19671958
crossbeam_channel::{unbounded, Receiver},
19681959
solana_address_lookup_table_program::state::{AddressLookupTable, LookupTableMeta},
19691960
solana_entry::entry::{next_entry, next_versioned_entry, Entry, EntrySlice},
@@ -2051,7 +2042,6 @@ mod tests {
20512042
None,
20522043
Arc::new(ConnectionCache::default()),
20532044
bank_forks,
2054-
banking_tracer,
20552045
);
20562046
drop(non_vote_sender);
20572047
drop(tpu_vote_sender);
@@ -2107,7 +2097,6 @@ mod tests {
21072097
None,
21082098
Arc::new(ConnectionCache::default()),
21092099
bank_forks,
2110-
banking_tracer,
21112100
);
21122101
trace!("sending bank");
21132102
drop(non_vote_sender);
@@ -2188,7 +2177,6 @@ mod tests {
21882177
None,
21892178
Arc::new(ConnectionCache::default()),
21902179
bank_forks,
2191-
banking_tracer,
21922180
);
21932181

21942182
// fund another account so we can send 2 good transactions in a single batch.
@@ -2350,7 +2338,6 @@ mod tests {
23502338
None,
23512339
Arc::new(ConnectionCache::default()),
23522340
bank_forks,
2353-
banking_tracer,
23542341
);
23552342

23562343
// wait for banking_stage to eat the packets
@@ -4127,7 +4114,6 @@ mod tests {
41274114
None,
41284115
Arc::new(ConnectionCache::default()),
41294116
bank_forks,
4130-
banking_tracer,
41314117
);
41324118

41334119
let keypairs = (0..100).map(|_| Keypair::new()).collect_vec();

core/src/tpu.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use {
55
crate::{
66
banking_stage::BankingStage,
7-
banking_trace::BankingTracer,
7+
banking_trace::{BankingTracer, TracerThread},
88
broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver},
99
cluster_info_vote_listener::{
1010
ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender,
@@ -69,6 +69,7 @@ pub struct Tpu {
6969
find_packet_sender_stake_stage: FindPacketSenderStakeStage,
7070
vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage,
7171
staked_nodes_updater_service: StakedNodesUpdaterService,
72+
tracer_thread_hdl: TracerThread,
7273
}
7374

7475
impl Tpu {
@@ -236,7 +237,6 @@ impl Tpu {
236237
log_messages_bytes_limit,
237238
connection_cache.clone(),
238239
bank_forks.clone(),
239-
banking_tracer,
240240
);
241241

242242
let broadcast_stage = broadcast_type.new_broadcast_stage(
@@ -262,6 +262,7 @@ impl Tpu {
262262
find_packet_sender_stake_stage,
263263
vote_find_packet_sender_stake_stage,
264264
staked_nodes_updater_service,
265+
tracer_thread_hdl: banking_tracer.take_tracer_thread_join_handle(),
265266
}
266267
}
267268

@@ -283,6 +284,14 @@ impl Tpu {
283284
result?;
284285
}
285286
let _ = broadcast_result?;
287+
if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
288+
if let Err(tracer_result) = tracer_thread_hdl.join()? {
289+
error!(
290+
"banking tracer thread returned error after successful thread join: {:?}",
291+
tracer_result
292+
);
293+
}
294+
}
286295
Ok(())
287296
}
288297
}

0 commit comments

Comments
 (0)