diff --git a/network/src/network.rs b/network/src/network.rs index b3726618cf..e95f923a22 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -28,6 +28,7 @@ use ckb_systemtime::{Duration, Instant}; use ckb_util::{Condvar, Mutex, RwLock}; use futures::{channel::mpsc::Sender, Future}; use ipnetwork::IpNetwork; +use p2p::multiaddr::MultiAddr; use p2p::{ async_trait, builder::ServiceBuilder, @@ -68,6 +69,54 @@ const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100); // After 5 minutes we consider this dial hang const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300); +/// CKB node's public addresses: +/// +/// This struct holds the public addresses of the CKB node, categorized by how they were obtained. +pub struct PublicAddresses { + /// Addresses explicitly configured by the user in the ckb.toml configuration file. + /// These addresses are considered static and represent the node's intended public endpoints. + configured: HashSet, + + /// Addresses discovered dynamically at runtime through observing successful outbound connections. + /// These addresses may change over time and are managed behind a `RwLock` to allow concurrent + /// read access while providing exclusive write access for updates. Addresses that fail to connect + /// are removed from this set. + discovered: RwLock>, +} + +impl PublicAddresses { + fn new(configured: HashSet, discovered: HashSet) -> Self { + Self { + configured, + discovered: RwLock::new(discovered), + } + } + + fn all(&self) -> Vec { + self.configured + .iter() + .chain(self.discovered.read().iter()) + .cloned() + .collect() + } + + fn contains(&self, addr: &MultiAddr) -> bool { + self.discovered.read().contains(addr) || self.configured.contains(addr) + } + + fn count(&self) -> usize { + self.configured.len() + self.discovered.read().len() + } + + fn random_choose(&self) -> Option { + let addrs = self.all(); + if addrs.is_empty() { + return None; + } + addrs.into_iter().choose(&mut rand::thread_rng()) + } +} + /// The global shared state of the network module pub struct NetworkState { pub(crate) peer_registry: RwLock, @@ -77,7 +126,7 @@ pub struct NetworkState { dialing_addrs: RwLock>, /// Node public addresses, /// includes manually public addrs and remote peer observed addrs - public_addrs: RwLock>, + public_addrs: PublicAddresses, pending_observed_addrs: RwLock>, local_private_key: secio::SecioKeyPair, local_peer_id: PeerId, @@ -100,7 +149,7 @@ impl NetworkState { let local_private_key = config.fetch_private_key()?; let local_peer_id = local_private_key.peer_id(); // set max score to public addresses - let public_addrs: HashSet = config + let configured_public_addrs: HashSet = config .listen_addresses .iter() .chain(config.public_addresses.iter()) @@ -115,6 +164,9 @@ impl NetworkState { } }) .collect(); + + let discovered_public_addrs = HashSet::new(); + let public_addrs = PublicAddresses::new(configured_public_addrs, discovered_public_addrs); info!("Loading the peer store. This process may take a few seconds to complete."); let peer_store = Mutex::new(PeerStore::load_from_dir_or_default( @@ -135,7 +187,7 @@ impl NetworkState { bootnodes, peer_registry: RwLock::new(peer_registry), dialing_addrs: RwLock::new(HashMap::default()), - public_addrs: RwLock::new(public_addrs), + public_addrs, listened_addrs: RwLock::new(Vec::new()), pending_observed_addrs: RwLock::new(HashSet::default()), local_private_key, @@ -335,7 +387,7 @@ impl NetworkState { pub(crate) fn public_addrs(&self, count: usize) -> Vec { self.public_addrs - .read() + .all() .iter() .take(count) .cloned() @@ -388,7 +440,7 @@ impl NetworkState { trace!("Do not dial self: {:?}, {}", peer_id, addr); return false; } - if self.public_addrs.read().contains(addr) { + if self.public_addrs.contains(addr) { trace!( "Do not dial listened address(self): {:?}, {}", peer_id, @@ -502,12 +554,12 @@ impl NetworkState { pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) { let mut pending_observed_addrs = self.pending_observed_addrs.write(); if pending_observed_addrs.is_empty() { - let addrs = self.public_addrs.read(); - if addrs.is_empty() { + let addrs = &self.public_addrs; + if addrs.count() == 0 { return; } // random get addr - if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) { + if let Some(addr) = addrs.random_choose() { if let Err(err) = p2p_control.dial( addr.clone(), TargetProtocol::Single(SupportProtocols::Identify.protocol_id()), @@ -611,7 +663,8 @@ impl ServiceHandle for EventHandler { async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) { match error { ServiceError::DialerError { address, error } => { - let mut public_addrs = self.network_state.public_addrs.write(); + let mut discovered_public_addrs = + self.network_state.public_addrs.discovered.write(); match error { DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError( @@ -620,7 +673,7 @@ impl ServiceHandle for EventHandler { debug!("dial observed address success: {:?}", address); if let Some(ip) = multiaddr_to_socketaddr(&address) { if is_reachable(ip.ip()) { - public_addrs.insert(address); + discovered_public_addrs.insert(address); } } return; @@ -634,9 +687,9 @@ impl ServiceHandle for EventHandler { debug!("DialerError({}) {}", address, error); } } - if public_addrs.remove(&address) { + if discovered_public_addrs.remove(&address) { info!( - "Dial {} failed, remove it from network_state.public_addrs", + "Dial {} failed, remove it from network_state.public_addrs.discovered", address ); }