Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 123 additions & 2 deletions lattica/src/network/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use chrono::Utc;
use cid::Cid;
use fnv::FnvHashMap;
use futures::io::WriteHalf;
use futures::{AsyncReadExt, AsyncWriteExt};
use futures::{AsyncReadExt, AsyncWriteExt, future::join_all};
use libp2p::{
Multiaddr, PeerId, Stream, Swarm, SwarmBuilder,
futures::StreamExt,
Expand Down Expand Up @@ -814,6 +814,120 @@ impl Lattica {
Err(anyhow!("Failed to reconnect to peer {}", peer_id))
}

/// Ensure bootstrap, relay nodes and known peers are connected (concurrent)
async fn ensure_network_connected(&self) {
// Collect all addresses to reconnect
let mut dial_futures = Vec::new();

// 1. Check bootstrap nodes
for addr in &self.config.bootstrap_nodes {
if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
let (tx, rx) = oneshot::channel();
if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_ok() {
if !rx.await.unwrap_or(false) {
tracing::info!("Bootstrap {} not connected, will reconnect...", peer_id);
dial_futures.push(self.dial_with_timeout(addr.clone(), "Bootstrap"));
}
}
}
}

// 2. Check relay servers
for addr in &self.config.relay_servers {
if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
let (tx, rx) = oneshot::channel();
if self.cmd.try_send(Command::CheckConnection(peer_id, tx)).is_ok() {
if !rx.await.unwrap_or(false) {
tracing::info!("Relay {} not connected, will reconnect...", peer_id);
dial_futures.push(self.dial_with_timeout(addr.clone(), "Relay"));
}
}
}
}

// 3. Check known peers from address book
let address_book = self.address_book.read().await;
let known_peers = address_book.peers();
drop(address_book);

let mut peer_reconnect_futures = Vec::new();
for peer_id in known_peers.iter() {
// Skip bootstrap and relay nodes
let is_infra = self.config.bootstrap_nodes.iter().any(|addr| {
addr.iter().last() == Some(Protocol::P2p(*peer_id))
}) || self.config.relay_servers.iter().any(|addr| {
addr.iter().last() == Some(Protocol::P2p(*peer_id))
});
if is_infra {
continue;
}

let (tx, rx) = oneshot::channel();
if self.cmd.try_send(Command::CheckConnection(*peer_id, tx)).is_ok() {
if !rx.await.unwrap_or(false) {
peer_reconnect_futures.push(self.try_reconnect_peer_logged(*peer_id));
}
}
}

// Execute all reconnections concurrently
if !dial_futures.is_empty() || !peer_reconnect_futures.is_empty() {
let dial_count = dial_futures.len();
let peer_count = peer_reconnect_futures.len();
tracing::info!(
"Reconnecting {} infrastructure nodes and {} data peers concurrently...",
dial_count,
peer_count
);

// Run all concurrently with overall timeout
let _ = tokio::time::timeout(Duration::from_secs(10), async {
let (dial_results, peer_results) = tokio::join!(
join_all(dial_futures),
join_all(peer_reconnect_futures)
);

let dial_success = dial_results.iter().filter(|r| r.is_ok()).count();
let peer_success = peer_results.iter().filter(|r| r.is_ok()).count();

tracing::info!(
"Reconnection complete: {}/{} infrastructure, {}/{} peers",
dial_success, dial_count,
peer_success, peer_count
);
}).await;
}
}

/// Dial an address with timeout
async fn dial_with_timeout(&self, addr: Multiaddr, label: &str) -> Result<()> {
let (tx, rx) = oneshot::channel();
if self.cmd.try_send(Command::Dial(addr.clone(), tx)).is_err() {
return Err(anyhow!("Failed to send dial command"));
}
match tokio::time::timeout(Duration::from_secs(5), rx).await {
Ok(Ok(_)) => {
tracing::debug!("{} dial success: {}", label, addr);
Ok(())
}
_ => Err(anyhow!("{} dial failed: {}", label, addr)),
}
}

/// Try to reconnect peer with logging
async fn try_reconnect_peer_logged(&self, peer_id: PeerId) -> Result<()> {
match self.try_reconnect_peer(&peer_id, Duration::from_secs(5)).await {
Ok(_) => {
tracing::info!("Reconnected to data provider peer {}", peer_id);
Ok(())
}
Err(e) => {
tracing::debug!("Failed to reconnect peer {}: {}", peer_id, e);
Err(e)
}
}
}

async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> {
// check swarm
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -1091,6 +1205,9 @@ impl Lattica {
address_book.info(peer_id)?.rtt()
}

/// Get a block from the network
///
/// On timeout, this method will trigger bootstrap reconnection for subsequent retries.
pub async fn get_block(
&self,
cid: &Cid,
Expand All @@ -1112,10 +1229,14 @@ impl Lattica {
Err(anyhow!("Receiver channel closed"))
}
Err(_) => {
// Timeout occurred, cancel the query
// Timeout - cancel query and trigger reconnection for next attempt
if let Some(qid) = query_id {
let _ = self.cmd.try_send(Command::CancelGet(qid));
}

// Trigger bootstrap reconnection in background for next retry
let _ = self.ensure_network_connected().await;

Err(anyhow!(
"get_block timeout: block not found or request timed out after {:?}",
timeout
Expand Down