Skip to content

Commit 2b03910

Browse files
authored
transaction performance tracking -- streamer stage (#257)
* transaction performance tracking -- streamer stage
1 parent bc81153 commit 2b03910

File tree

9 files changed

+317
-3
lines changed

9 files changed

+317
-3
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ members = [
107107
"tokens",
108108
"tpu-client",
109109
"transaction-dos",
110+
"transaction-metrics-tracker",
110111
"transaction-status",
111112
"turbine",
112113
"udp-client",
@@ -380,6 +381,7 @@ solana-test-validator = { path = "test-validator", version = "=2.0.0" }
380381
solana-thin-client = { path = "thin-client", version = "=2.0.0" }
381382
solana-tpu-client = { path = "tpu-client", version = "=2.0.0", default-features = false }
382383
solana-transaction-status = { path = "transaction-status", version = "=2.0.0" }
384+
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.0.0" }
383385
solana-turbine = { path = "turbine", version = "=2.0.0" }
384386
solana-udp-client = { path = "udp-client", version = "=2.0.0" }
385387
solana-version = { path = "version", version = "=2.0.0" }

programs/sbf/Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/src/packet.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ bitflags! {
3333
/// the packet is built.
3434
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
3535
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
36+
/// For tracking performance
37+
const PERF_TRACK_PACKET = 0b0100_0000;
3638
}
3739
}
3840

@@ -228,6 +230,12 @@ impl Meta {
228230
self.flags.set(PacketFlags::TRACER_PACKET, is_tracer);
229231
}
230232

233+
#[inline]
234+
pub fn set_track_performance(&mut self, is_performance_track: bool) {
235+
self.flags
236+
.set(PacketFlags::PERF_TRACK_PACKET, is_performance_track);
237+
}
238+
231239
#[inline]
232240
pub fn set_simple_vote(&mut self, is_simple_vote: bool) {
233241
self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote);
@@ -261,6 +269,11 @@ impl Meta {
261269
self.flags.contains(PacketFlags::TRACER_PACKET)
262270
}
263271

272+
#[inline]
273+
pub fn is_perf_track_packet(&self) -> bool {
274+
self.flags.contains(PacketFlags::PERF_TRACK_PACKET)
275+
}
276+
264277
#[inline]
265278
pub fn round_compute_unit_price(&self) -> bool {
266279
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)

streamer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ quinn = { workspace = true }
2626
quinn-proto = { workspace = true }
2727
rand = { workspace = true }
2828
rustls = { workspace = true, features = ["dangerous_configuration"] }
29+
solana-measure = { workspace = true }
2930
solana-metrics = { workspace = true }
3031
solana-perf = { workspace = true }
3132
solana-sdk = { workspace = true }
33+
solana-transaction-metrics-tracker = { workspace = true }
3234
thiserror = { workspace = true }
3335
tokio = { workspace = true, features = ["full"] }
3436
x509-parser = { workspace = true }

streamer/src/nonblocking/quic.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use {
1717
quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
1818
quinn_proto::VarIntBoundsExceeded,
1919
rand::{thread_rng, Rng},
20+
solana_measure::measure::Measure,
2021
solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH},
2122
solana_sdk::{
2223
packet::{Meta, PACKET_DATA_SIZE},
@@ -27,9 +28,10 @@ use {
2728
QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO,
2829
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
2930
},
30-
signature::Keypair,
31+
signature::{Keypair, Signature},
3132
timing,
3233
},
34+
solana_transaction_metrics_tracker::signature_if_should_track_packet,
3335
std::{
3436
iter::repeat_with,
3537
net::{IpAddr, SocketAddr, UdpSocket},
@@ -94,6 +96,7 @@ struct PacketChunk {
9496
struct PacketAccumulator {
9597
pub meta: Meta,
9698
pub chunks: Vec<PacketChunk>,
99+
pub start_time: Instant,
97100
}
98101

99102
#[derive(Copy, Clone, Debug)]
@@ -646,6 +649,7 @@ async fn packet_batch_sender(
646649
trace!("enter packet_batch_sender");
647650
let mut batch_start_time = Instant::now();
648651
loop {
652+
let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default();
649653
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
650654
let mut total_bytes: usize = 0;
651655

@@ -665,6 +669,8 @@ async fn packet_batch_sender(
665669
|| (!packet_batch.is_empty() && elapsed >= coalesce)
666670
{
667671
let len = packet_batch.len();
672+
track_streamer_fetch_packet_performance(&mut packet_perf_measure, &stats);
673+
668674
if let Err(e) = packet_sender.send(packet_batch) {
669675
stats
670676
.total_packet_batch_send_err
@@ -710,6 +716,14 @@ async fn packet_batch_sender(
710716

711717
total_bytes += packet_batch[i].meta().size;
712718

719+
if let Some(signature) = signature_if_should_track_packet(&packet_batch[i])
720+
.ok()
721+
.flatten()
722+
{
723+
packet_perf_measure.push((*signature, packet_accumulator.start_time));
724+
// we set the PERF_TRACK_PACKET on
725+
packet_batch[i].meta_mut().set_track_performance(true);
726+
}
713727
stats
714728
.total_chunks_processed_by_batcher
715729
.fetch_add(num_chunks, Ordering::Relaxed);
@@ -718,6 +732,32 @@ async fn packet_batch_sender(
718732
}
719733
}
720734

735+
fn track_streamer_fetch_packet_performance(
736+
packet_perf_measure: &mut [([u8; 64], Instant)],
737+
stats: &Arc<StreamStats>,
738+
) {
739+
if packet_perf_measure.is_empty() {
740+
return;
741+
}
742+
let mut measure = Measure::start("track_perf");
743+
let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap();
744+
745+
for (signature, start_time) in packet_perf_measure.iter() {
746+
let duration = Instant::now().duration_since(*start_time);
747+
debug!(
748+
"QUIC streamer fetch stage took {duration:?} for transaction {:?}",
749+
Signature::from(*signature)
750+
);
751+
process_sampled_packets_us_hist
752+
.increment(duration.as_micros() as u64)
753+
.unwrap();
754+
}
755+
measure.stop();
756+
stats
757+
.perf_track_overhead_us
758+
.fetch_add(measure.as_us(), Ordering::Relaxed);
759+
}
760+
721761
async fn handle_connection(
722762
connection: Connection,
723763
remote_addr: SocketAddr,
@@ -872,6 +912,7 @@ async fn handle_chunk(
872912
*packet_accum = Some(PacketAccumulator {
873913
meta,
874914
chunks: Vec::new(),
915+
start_time: Instant::now(),
875916
});
876917
}
877918

@@ -1471,6 +1512,7 @@ pub mod test {
14711512
offset,
14721513
end_of_chunk: size,
14731514
}],
1515+
start_time: Instant::now(),
14741516
};
14751517
ptk_sender.send(packet_accum).await.unwrap();
14761518
}

streamer/src/quic.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use {
1616
std::{
1717
net::UdpSocket,
1818
sync::{
19-
atomic::{AtomicBool, AtomicUsize, Ordering},
20-
Arc, RwLock,
19+
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
20+
Arc, Mutex, RwLock,
2121
},
2222
thread,
2323
time::{Duration, SystemTime},
@@ -175,10 +175,19 @@ pub struct StreamStats {
175175
pub(crate) stream_load_ema: AtomicUsize,
176176
pub(crate) stream_load_ema_overflow: AtomicUsize,
177177
pub(crate) stream_load_capacity_overflow: AtomicUsize,
178+
pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
179+
pub(crate) perf_track_overhead_us: AtomicU64,
178180
}
179181

180182
impl StreamStats {
181183
pub fn report(&self, name: &'static str) {
184+
let process_sampled_packets_us_hist = {
185+
let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
186+
let process_sampled_packets_us_hist = metrics.clone();
187+
metrics.clear();
188+
process_sampled_packets_us_hist
189+
};
190+
182191
datapoint_info!(
183192
name,
184193
(
@@ -425,6 +434,38 @@ impl StreamStats {
425434
self.stream_load_capacity_overflow.load(Ordering::Relaxed),
426435
i64
427436
),
437+
(
438+
"process_sampled_packets_us_90pct",
439+
process_sampled_packets_us_hist
440+
.percentile(90.0)
441+
.unwrap_or(0),
442+
i64
443+
),
444+
(
445+
"process_sampled_packets_us_min",
446+
process_sampled_packets_us_hist.minimum().unwrap_or(0),
447+
i64
448+
),
449+
(
450+
"process_sampled_packets_us_max",
451+
process_sampled_packets_us_hist.maximum().unwrap_or(0),
452+
i64
453+
),
454+
(
455+
"process_sampled_packets_us_mean",
456+
process_sampled_packets_us_hist.mean().unwrap_or(0),
457+
i64
458+
),
459+
(
460+
"process_sampled_packets_count",
461+
process_sampled_packets_us_hist.entries(),
462+
i64
463+
),
464+
(
465+
"perf_track_overhead_us",
466+
self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
467+
i64
468+
),
428469
);
429470
}
430471
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "solana-transaction-metrics-tracker"
3+
description = "Solana transaction metrics tracker"
4+
documentation = "https://docs.rs/solana-transaction-metrics-tracker"
5+
version = { workspace = true }
6+
authors = { workspace = true }
7+
repository = { workspace = true }
8+
homepage = { workspace = true }
9+
license = { workspace = true }
10+
edition = { workspace = true }
11+
publish = false
12+
13+
[dependencies]
14+
Inflector = { workspace = true }
15+
base64 = { workspace = true }
16+
bincode = { workspace = true }
17+
# Update this borsh dependency to the workspace version once
18+
lazy_static = { workspace = true }
19+
log = { workspace = true }
20+
rand = { workspace = true }
21+
solana-perf = { workspace = true }
22+
solana-sdk = { workspace = true }
23+
24+
[package.metadata.docs.rs]
25+
targets = ["x86_64-unknown-linux-gnu"]

0 commit comments

Comments
 (0)