Skip to content

Commit

Permalink
streamer send destination metrics for repair, gossip (solana-labs#21564)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbiseda authored Dec 17, 2021
1 parent 76098dd commit 97a1fa1
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub mod sigverify;
pub mod sigverify_shreds;
pub mod sigverify_stage;
pub mod snapshot_packager_service;
pub mod stats_reporter_service;
pub mod system_monitor_service;
pub mod tower_storage;
pub mod tpu;
Expand Down
8 changes: 7 additions & 1 deletion core/src/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use {
solana_streamer::{socket::SocketAddrSpace, streamer},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, mpsc::channel, Arc, RwLock},
sync::{
atomic::AtomicBool,
mpsc::{channel, Sender},
Arc, RwLock,
},
thread::{self, JoinHandle},
},
};
Expand All @@ -20,6 +24,7 @@ impl ServeRepairService {
blockstore: Option<Arc<Blockstore>>,
serve_repair_socket: UdpSocket,
socket_addr_space: SocketAddrSpace,
stats_reporter_sender: Sender<Box<dyn FnOnce() + Send>>,
exit: &Arc<AtomicBool>,
) -> Self {
let (request_sender, request_receiver) = channel();
Expand All @@ -44,6 +49,7 @@ impl ServeRepairService {
serve_repair_socket,
response_receiver,
socket_addr_space,
Some(stats_reporter_sender),
);
let t_listen = ServeRepair::listen(
serve_repair.clone(),
Expand Down
53 changes: 53 additions & 0 deletions core/src/stats_reporter_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::{
result::Result,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, RecvTimeoutError},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};

pub struct StatsReporterService {
thread_hdl: JoinHandle<()>,
}

impl StatsReporterService {
pub fn new(
reporting_receiver: Receiver<Box<dyn FnOnce() + Send>>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-stats-reporter".to_owned())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
if let Err(e) = Self::receive_reporting_func(&reporting_receiver) {
match e {
RecvTimeoutError::Disconnected => break,
RecvTimeoutError::Timeout => (),
}
}
})
.unwrap();

Self { thread_hdl }
}

pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()?;
Ok(())
}

fn receive_reporting_func(
r: &Receiver<Box<dyn FnOnce() + Send>>,
) -> Result<(), RecvTimeoutError> {
let timer = Duration::new(1, 0);
let func = r.recv_timeout(timer)?;
func();
Ok(())
}
}
12 changes: 12 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
serve_repair_service::ServeRepairService,
sigverify,
snapshot_packager_service::SnapshotPackagerService,
stats_reporter_service::StatsReporterService,
system_monitor_service::{verify_udp_stats_access, SystemMonitorService},
tower_storage::TowerStorage,
tpu::{Tpu, DEFAULT_TPU_COALESCE_MS},
Expand Down Expand Up @@ -276,6 +277,7 @@ pub struct Validator {
cache_block_meta_service: Option<CacheBlockMetaService>,
system_monitor_service: Option<SystemMonitorService>,
sample_performance_service: Option<SamplePerformanceService>,
stats_reporter_service: StatsReporterService,
gossip_service: GossipService,
serve_repair_service: ServeRepairService,
completed_data_sets_service: CompletedDataSetsService,
Expand Down Expand Up @@ -697,12 +699,17 @@ impl Validator {
Some(node.info.shred_version),
)),
};

let (stats_reporter_sender, stats_reporter_receiver) = channel();
let stats_reporter_service = StatsReporterService::new(stats_reporter_receiver, &exit);

let gossip_service = GossipService::new(
&cluster_info,
Some(bank_forks.clone()),
node.sockets.gossip,
config.gossip_validators.clone(),
should_check_duplicate_instance,
Some(stats_reporter_sender.clone()),
&exit,
);
let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone())));
Expand All @@ -711,6 +718,7 @@ impl Validator {
Some(blockstore.clone()),
node.sockets.serve_repair,
socket_addr_space,
stats_reporter_sender,
&exit,
);

Expand Down Expand Up @@ -904,6 +912,7 @@ impl Validator {

*start_progress.write().unwrap() = ValidatorStartProgress::Running;
Self {
stats_reporter_service,
gossip_service,
serve_repair_service,
json_rpc_service,
Expand Down Expand Up @@ -1028,6 +1037,9 @@ impl Validator {
self.serve_repair_service
.join()
.expect("serve_repair_service");
self.stats_reporter_service
.join()
.expect("stats_reporter_service");
self.tpu.join().expect("tpu");
self.tvu.join().expect("tvu");
self.completed_data_sets_service
Expand Down
6 changes: 5 additions & 1 deletion gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
mpsc::{channel, Sender},
Arc, RwLock,
},
thread::{self, sleep, JoinHandle},
Expand All @@ -38,6 +38,7 @@ impl GossipService {
gossip_socket: UdpSocket,
gossip_validators: Option<HashSet<Pubkey>>,
should_check_duplicate_instance: bool,
stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
exit: &Arc<AtomicBool>,
) -> Self {
let (request_sender, request_receiver) = channel();
Expand Down Expand Up @@ -88,6 +89,7 @@ impl GossipService {
gossip_socket,
response_receiver,
socket_addr_space,
stats_reporter_sender,
);
let thread_hdls = vec![
t_receiver,
Expand Down Expand Up @@ -331,6 +333,7 @@ pub fn make_gossip_node(
gossip_socket,
None,
should_check_duplicate_instance,
None,
exit,
);
(gossip_service, ip_echo, cluster_info)
Expand Down Expand Up @@ -362,6 +365,7 @@ mod tests {
tn.sockets.gossip,
None,
true, // should_check_duplicate_instance
None,
&exit,
);
exit.store(true, Ordering::Relaxed);
Expand Down
2 changes: 2 additions & 0 deletions gossip/tests/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ fn test_node(exit: &Arc<AtomicBool>) -> (Arc<ClusterInfo>, GossipService, UdpSoc
test_node.sockets.gossip,
None,
true, // should_check_duplicate_instance
None,
exit,
);
let _ = cluster_info.my_contact_info();
Expand Down Expand Up @@ -72,6 +73,7 @@ fn test_node_with_bank(
test_node.sockets.gossip,
None,
true, // should_check_duplicate_instance
None,
exit,
);
let _ = cluster_info.my_contact_info();
Expand Down
1 change: 1 addition & 0 deletions replica-node/src/replica_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ fn start_gossip_node(
gossip_socket,
gossip_validators,
should_check_duplicate_instance,
None,
&gossip_exit_flag,
);
info!("Started gossip node");
Expand Down
1 change: 1 addition & 0 deletions streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-streamer"
edition = "2021"

[dependencies]
histogram = "0.6.9"
itertools = "0.10.3"
log = "0.4.14"
solana-metrics = { path = "../metrics", version = "=1.10.0" }
Expand Down
Loading

0 comments on commit 97a1fa1

Please sign in to comment.