Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent EventHandler::handle_error remove user configured [network].public_addresses #4824

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 77 additions & 12 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use ckb_systemtime::{Duration, Instant};
use ckb_util::{Condvar, Mutex, RwLock};
use futures::{Future, channel::mpsc::Sender};
use ipnetwork::IpNetwork;
use p2p::multiaddr::MultiAddr;
use p2p::{
SessionId, async_trait,
builder::ServiceBuilder,
Expand Down Expand Up @@ -67,6 +68,54 @@ const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
// After 5 minutes we consider this dial hang
const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);

/// CKB node's public addresses:
///
/// This struct holds the public addresses of the CKB node, categorized by how they were obtained.
pub struct PublicAddresses {
/// Addresses explicitly configured by the user in the ckb.toml configuration file.
/// These addresses are considered static and represent the node's intended public endpoints.
configured: HashSet<MultiAddr>,

/// Addresses discovered dynamically at runtime through observing successful outbound connections.
/// These addresses may change over time and are managed behind a `RwLock` to allow concurrent
/// read access while providing exclusive write access for updates. Addresses that fail to connect
/// are removed from this set.
discovered: RwLock<HashSet<Multiaddr>>,
}

impl PublicAddresses {
fn new(configured: HashSet<MultiAddr>, discovered: HashSet<Multiaddr>) -> Self {
Self {
configured,
discovered: RwLock::new(discovered),
}
}

fn all(&self) -> Vec<MultiAddr> {
self.configured
.iter()
.chain(self.discovered.read().iter())
.cloned()
.collect()
}

fn contains(&self, addr: &MultiAddr) -> bool {
self.discovered.read().contains(addr) || self.configured.contains(addr)
}

fn count(&self) -> usize {
self.configured.len() + self.discovered.read().len()
}

fn random_choose(&self) -> Option<MultiAddr> {
let addrs = self.all();
if addrs.is_empty() {
return None;
}
addrs.into_iter().choose(&mut rand::thread_rng())
}
}

/// The global shared state of the network module
pub struct NetworkState {
pub(crate) peer_registry: RwLock<PeerRegistry>,
Expand All @@ -76,7 +125,7 @@ pub struct NetworkState {
dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
/// Node public addresses,
/// includes manually public addrs and remote peer observed addrs
public_addrs: RwLock<HashSet<Multiaddr>>,
public_addrs: PublicAddresses,
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
local_private_key: secio::SecioKeyPair,
local_peer_id: PeerId,
Expand All @@ -99,7 +148,7 @@ impl NetworkState {
let local_private_key = config.fetch_private_key()?;
let local_peer_id = local_private_key.peer_id();
// set max score to public addresses
let public_addrs: HashSet<Multiaddr> = config
let configured_public_addrs: HashSet<Multiaddr> = config
.listen_addresses
.iter()
.chain(config.public_addresses.iter())
Expand All @@ -114,6 +163,9 @@ impl NetworkState {
}
})
.collect();

let discovered_public_addrs = HashSet::new();
let public_addrs = PublicAddresses::new(configured_public_addrs, discovered_public_addrs);
info!("Loading the peer store. This process may take a few seconds to complete.");

let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
Expand All @@ -134,7 +186,7 @@ impl NetworkState {
bootnodes,
peer_registry: RwLock::new(peer_registry),
dialing_addrs: RwLock::new(HashMap::default()),
public_addrs: RwLock::new(public_addrs),
public_addrs,
listened_addrs: RwLock::new(Vec::new()),
pending_observed_addrs: RwLock::new(HashSet::default()),
local_private_key,
Expand Down Expand Up @@ -334,7 +386,7 @@ impl NetworkState {

pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
self.public_addrs
.read()
.all()
.iter()
.take(count)
.cloned()
Expand Down Expand Up @@ -387,7 +439,7 @@ impl NetworkState {
trace!("Do not dial self: {:?}, {}", peer_id, addr);
return false;
}
if self.public_addrs.read().contains(addr) {
if self.public_addrs.contains(addr) {
trace!(
"Do not dial listened address(self): {:?}, {}",
peer_id, addr
Expand Down Expand Up @@ -499,12 +551,12 @@ impl NetworkState {
pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
let mut pending_observed_addrs = self.pending_observed_addrs.write();
if pending_observed_addrs.is_empty() {
let addrs = self.public_addrs.read();
if addrs.is_empty() {
let addrs = &self.public_addrs;
if addrs.count() == 0 {
return;
}
// random get addr
if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
if let Some(addr) = addrs.random_choose() {
if let Err(err) = p2p_control.dial(
addr.clone(),
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
Expand Down Expand Up @@ -608,7 +660,11 @@ impl ServiceHandle for EventHandler {
async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
match error {
ServiceError::DialerError { address, error } => {
let mut public_addrs = self.network_state.public_addrs.write();
let mut discovered_public_addrs =
self.network_state.public_addrs.discovered.write();

let mut user_configured_public_addrs =
self.network_state.public_addrs.configured.write();

match error {
DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
Expand All @@ -617,7 +673,7 @@ impl ServiceHandle for EventHandler {
debug!("dial observed address success: {:?}", address);
if let Some(ip) = multiaddr_to_socketaddr(&address) {
if is_reachable(ip.ip()) {
public_addrs.insert(address);
discovered_public_addrs.insert(address);
}
}
return;
Expand All @@ -631,9 +687,18 @@ impl ServiceHandle for EventHandler {
debug!("DialerError({}) {}", address, error);
}
}
if public_addrs.remove(&address) {

if user_configured_public_addrs.contains(&address) {
// don't remove the public_addr, sicne its user configred in ckb.toml
warn!(
"Dial the public addr {} which is configured in ckb.toml failed, keep it.",
address
);
}

if discovered_public_addrs.remove(&address) {
info!(
"Dial {} failed, remove it from network_state.public_addrs",
"Dial {} failed, remove it from network_state.public_addrs.discovered",
address
);
}
Expand Down
Loading