Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 1c2ae47

Browse files
authored
Fix forwarding of transactions over QUIC (#25674)
* Spawn QUIC server to receive forwarded txs * Update validator port range * forward votes using UDP * no forwarding from unstaked nodes * forwarding stats in banking stage * fix test builds * fix lifetime of forward sender
1 parent ed68c0b commit 1c2ae47

File tree

10 files changed

+225
-53
lines changed

10 files changed

+225
-53
lines changed

client/src/udp_client.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct UdpTpuConnection {
1919
}
2020

2121
impl UdpTpuConnection {
22-
pub fn new(tpu_addr: SocketAddr, _connection_stats: Arc<ConnectionCacheStats>) -> Self {
22+
pub fn new_from_addr(tpu_addr: SocketAddr) -> Self {
2323
let (_, client_socket) = solana_net_utils::bind_in_range(
2424
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
2525
VALIDATOR_PORT_RANGE,
@@ -31,6 +31,10 @@ impl UdpTpuConnection {
3131
addr: tpu_addr,
3232
}
3333
}
34+
35+
pub fn new(tpu_addr: SocketAddr, _connection_stats: Arc<ConnectionCacheStats>) -> Self {
36+
Self::new_from_addr(tpu_addr)
37+
}
3438
}
3539

3640
impl TpuConnection for UdpTpuConnection {

client/tests/quic_client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod tests {
77
tpu_connection::TpuConnection,
88
},
99
solana_sdk::{packet::PACKET_DATA_SIZE, quic::QUIC_PORT_OFFSET, signature::Keypair},
10-
solana_streamer::quic::spawn_server,
10+
solana_streamer::quic::{spawn_server, StreamStats},
1111
std::{
1212
collections::HashMap,
1313
net::{SocketAddr, UdpSocket},
@@ -28,6 +28,7 @@ mod tests {
2828
let keypair = Keypair::new();
2929
let ip = "127.0.0.1".parse().unwrap();
3030
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
31+
let stats = Arc::new(StreamStats::default());
3132
let t = spawn_server(
3233
s.try_clone().unwrap(),
3334
&keypair,
@@ -38,6 +39,7 @@ mod tests {
3839
staked_nodes,
3940
10,
4041
10,
42+
stats,
4143
)
4244
.unwrap();
4345

core/src/banking_stage.rs

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ use {
1717
histogram::Histogram,
1818
itertools::Itertools,
1919
min_max_heap::MinMaxHeap,
20-
solana_client::{connection_cache::get_connection, tpu_connection::TpuConnection},
20+
solana_client::{
21+
connection_cache::get_connection, tpu_connection::TpuConnection,
22+
udp_client::UdpTpuConnection,
23+
},
2124
solana_entry::entry::hash_transactions,
2225
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
2326
solana_ledger::blockstore_processor::TransactionStatusSender,
@@ -147,6 +150,8 @@ pub struct BankingStageStats {
147150
rebuffered_packets_count: AtomicUsize,
148151
consumed_buffered_packets_count: AtomicUsize,
149152
end_of_slot_filtered_invalid_count: AtomicUsize,
153+
forwarded_transaction_count: AtomicUsize,
154+
forwarded_vote_count: AtomicUsize,
150155
batch_packet_indexes_len: Histogram,
151156

152157
// Timing
@@ -201,6 +206,8 @@ impl BankingStageStats {
201206
.unprocessed_packet_conversion_elapsed
202207
.load(Ordering::Relaxed)
203208
+ self.transaction_processing_elapsed.load(Ordering::Relaxed)
209+
+ self.forwarded_transaction_count.load(Ordering::Relaxed) as u64
210+
+ self.forwarded_vote_count.load(Ordering::Relaxed) as u64
204211
+ self.batch_packet_indexes_len.entries()
205212
}
206213

@@ -264,6 +271,16 @@ impl BankingStageStats {
264271
.swap(0, Ordering::Relaxed) as i64,
265272
i64
266273
),
274+
(
275+
"forwarded_transaction_count",
276+
self.forwarded_transaction_count.swap(0, Ordering::Relaxed) as i64,
277+
i64
278+
),
279+
(
280+
"forwarded_vote_count",
281+
self.forwarded_vote_count.swap(0, Ordering::Relaxed) as i64,
282+
i64
283+
),
267284
(
268285
"consume_buffered_packets_elapsed",
269286
self.consume_buffered_packets_elapsed
@@ -489,10 +506,26 @@ impl BankingStage {
489506
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
490507
/// the number of successfully forwarded packets in second part of tuple
491508
fn forward_buffered_packets(
492-
tpu_forwards: &std::net::SocketAddr,
509+
forward_option: &ForwardOption,
510+
cluster_info: &ClusterInfo,
511+
poh_recorder: &Arc<Mutex<PohRecorder>>,
493512
packets: Vec<&Packet>,
494513
data_budget: &DataBudget,
514+
banking_stage_stats: &BankingStageStats,
495515
) -> (std::result::Result<(), TransportError>, usize) {
516+
let addr = match forward_option {
517+
ForwardOption::NotForward => return (Ok(()), 0),
518+
ForwardOption::ForwardTransaction => {
519+
next_leader_tpu_forwards(cluster_info, poh_recorder)
520+
}
521+
522+
ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder),
523+
};
524+
let addr = match addr {
525+
Some(addr) => addr,
526+
None => return (Ok(()), 0),
527+
};
528+
496529
const INTERVAL_MS: u64 = 100;
497530
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
498531
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
@@ -525,7 +558,20 @@ impl BankingStage {
525558

526559
let mut measure = Measure::start("banking_stage-forward-us");
527560

528-
let conn = get_connection(tpu_forwards);
561+
let conn = if let ForwardOption::ForwardTpuVote = forward_option {
562+
// The vote must be forwarded using only UDP. Let's get the UDP connection.
563+
banking_stage_stats
564+
.forwarded_vote_count
565+
.fetch_add(packet_vec_len, Ordering::Relaxed);
566+
Arc::new(UdpTpuConnection::new_from_addr(addr).into())
567+
} else {
568+
// All other transactions can be forwarded using QUIC, get_connection() will use
569+
// system wide setting to pick the correct connection object.
570+
banking_stage_stats
571+
.forwarded_transaction_count
572+
.fetch_add(packet_vec_len, Ordering::Relaxed);
573+
get_connection(&addr)
574+
};
529575
let res = conn.send_wire_transaction_batch_async(packet_vec);
530576

531577
measure.stop();
@@ -908,6 +954,7 @@ impl BankingStage {
908954
false,
909955
data_budget,
910956
slot_metrics_tracker,
957+
banking_stage_stats,
911958
)
912959
},
913960
(),
@@ -926,6 +973,7 @@ impl BankingStage {
926973
true,
927974
data_budget,
928975
slot_metrics_tracker,
976+
banking_stage_stats,
929977
)
930978
},
931979
(),
@@ -945,29 +993,26 @@ impl BankingStage {
945993
hold: bool,
946994
data_budget: &DataBudget,
947995
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
996+
banking_stage_stats: &BankingStageStats,
948997
) {
949-
let addr = match forward_option {
950-
ForwardOption::NotForward => {
951-
if !hold {
952-
buffered_packet_batches.clear();
953-
}
954-
return;
955-
}
956-
ForwardOption::ForwardTransaction => {
957-
next_leader_tpu_forwards(cluster_info, poh_recorder)
998+
if let ForwardOption::NotForward = forward_option {
999+
if !hold {
1000+
buffered_packet_batches.clear();
9581001
}
959-
ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder),
960-
};
961-
let addr = match addr {
962-
Some(addr) => addr,
963-
None => return,
964-
};
1002+
return;
1003+
}
9651004

9661005
let forwardable_packets =
9671006
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
9681007
let forwardable_packets_len = forwardable_packets.len();
969-
let (_forward_result, sucessful_forwarded_packets_count) =
970-
Self::forward_buffered_packets(&addr, forwardable_packets, data_budget);
1008+
let (_forward_result, sucessful_forwarded_packets_count) = Self::forward_buffered_packets(
1009+
forward_option,
1010+
cluster_info,
1011+
poh_recorder,
1012+
forwardable_packets,
1013+
data_budget,
1014+
banking_stage_stats,
1015+
);
9711016
let failed_forwarded_packets_count =
9721017
forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count);
9731018

@@ -4072,6 +4117,7 @@ mod tests {
40724117
vec![deserialized_packet.clone()].into_iter(),
40734118
1,
40744119
);
4120+
let stats = BankingStageStats::default();
40754121
BankingStage::handle_forwarding(
40764122
&ForwardOption::ForwardTransaction,
40774123
&cluster_info,
@@ -4080,6 +4126,7 @@ mod tests {
40804126
true,
40814127
&data_budget,
40824128
&mut LeaderSlotMetricsTracker::new(0),
4129+
&stats,
40834130
);
40844131

40854132
recv_socket
@@ -4179,6 +4226,7 @@ mod tests {
41794226
];
41804227

41814228
for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases {
4229+
let stats = BankingStageStats::default();
41824230
BankingStage::handle_forwarding(
41834231
&forward_option,
41844232
&cluster_info,
@@ -4187,6 +4235,7 @@ mod tests {
41874235
hold,
41884236
&DataBudget::default(),
41894237
&mut LeaderSlotMetricsTracker::new(0),
4238+
&stats,
41904239
);
41914240

41924241
recv_socket

core/src/fetch_stage.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ impl FetchStage {
4343
) -> (Self, PacketBatchReceiver, PacketBatchReceiver) {
4444
let (sender, receiver) = unbounded();
4545
let (vote_sender, vote_receiver) = unbounded();
46+
let (forward_sender, forward_receiver) = unbounded();
4647
(
4748
Self::new_with_sender(
4849
sockets,
@@ -51,6 +52,8 @@ impl FetchStage {
5152
exit,
5253
&sender,
5354
&vote_sender,
55+
&forward_sender,
56+
forward_receiver,
5457
poh_recorder,
5558
coalesce_ms,
5659
None,
@@ -60,13 +63,16 @@ impl FetchStage {
6063
)
6164
}
6265

66+
#[allow(clippy::too_many_arguments)]
6367
pub fn new_with_sender(
6468
sockets: Vec<UdpSocket>,
6569
tpu_forwards_sockets: Vec<UdpSocket>,
6670
tpu_vote_sockets: Vec<UdpSocket>,
6771
exit: &Arc<AtomicBool>,
6872
sender: &PacketBatchSender,
6973
vote_sender: &PacketBatchSender,
74+
forward_sender: &PacketBatchSender,
75+
forward_receiver: PacketBatchReceiver,
7076
poh_recorder: &Arc<Mutex<PohRecorder>>,
7177
coalesce_ms: u64,
7278
in_vote_only_mode: Option<Arc<AtomicBool>>,
@@ -81,6 +87,8 @@ impl FetchStage {
8187
exit,
8288
sender,
8389
vote_sender,
90+
forward_sender,
91+
forward_receiver,
8492
poh_recorder,
8593
coalesce_ms,
8694
in_vote_only_mode,
@@ -129,13 +137,16 @@ impl FetchStage {
129137
Ok(())
130138
}
131139

140+
#[allow(clippy::too_many_arguments)]
132141
fn new_multi_socket(
133142
tpu_sockets: Vec<Arc<UdpSocket>>,
134143
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
135144
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
136145
exit: &Arc<AtomicBool>,
137146
sender: &PacketBatchSender,
138147
vote_sender: &PacketBatchSender,
148+
forward_sender: &PacketBatchSender,
149+
forward_receiver: PacketBatchReceiver,
139150
poh_recorder: &Arc<Mutex<PohRecorder>>,
140151
coalesce_ms: u64,
141152
in_vote_only_mode: Option<Arc<AtomicBool>>,
@@ -160,7 +171,6 @@ impl FetchStage {
160171
.collect();
161172

162173
let tpu_forward_stats = Arc::new(StreamerReceiveStats::new("tpu_forwards_receiver"));
163-
let (forward_sender, forward_receiver) = unbounded();
164174
let tpu_forwards_threads: Vec<_> = tpu_forwards_sockets
165175
.into_iter()
166176
.map(|socket| {

0 commit comments

Comments
 (0)