diff --git a/lattica/src/network/core.rs b/lattica/src/network/core.rs index 9bd6140..ad11e3b 100644 --- a/lattica/src/network/core.rs +++ b/lattica/src/network/core.rs @@ -13,7 +13,7 @@ use chrono::Utc; use cid::Cid; use fnv::FnvHashMap; use futures::io::WriteHalf; -use futures::{AsyncReadExt, AsyncWriteExt}; +use futures::{AsyncReadExt, AsyncWriteExt, future::join_all}; use libp2p::{ Multiaddr, PeerId, Stream, Swarm, SwarmBuilder, futures::StreamExt, @@ -814,36 +814,77 @@ impl Lattica { Err(anyhow!("Failed to reconnect to peer {}", peer_id)) } - async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> { - // check swarm + /// Ensure known data peers are connected (concurrent) + /// Note: Bootstrap and relay connections are managed by the global reconnect_timer in swarm_poll + async fn ensure_network_connected(&self) { + // Check known peers from address book + let address_book = self.address_book.read().await; + let known_peers = address_book.peers(); + drop(address_book); + + let mut peer_reconnect_futures = Vec::new(); + for peer_id in known_peers.iter() { + // Skip bootstrap and relay nodes (handled by global timer) + let is_infra = self.config.bootstrap_nodes.iter().any(|addr| { + addr.iter().last() == Some(Protocol::P2p(*peer_id)) + }) || self.config.relay_servers.iter().any(|addr| { + addr.iter().last() == Some(Protocol::P2p(*peer_id)) + }); + if is_infra { + continue; + } + + let peer_id = *peer_id; + peer_reconnect_futures.push(async move { + self.ensure_peer_connected(&peer_id, Duration::from_secs(5)).await + }); + } + + // Execute all reconnections concurrently + if !peer_reconnect_futures.is_empty() { + let peer_count = peer_reconnect_futures.len(); + tracing::info!("Checking {} data peers concurrently...", peer_count); + + // Run all concurrently with overall timeout + let _ = tokio::time::timeout(Duration::from_secs(10), async { + let peer_results = join_all(peer_reconnect_futures).await; + let peer_success = peer_results.iter().filter(|r| r.is_ok()).count(); + tracing::info!("Connection check complete: {}/{} peers", peer_success, peer_count); + }).await; + } + } + + /// Ensure a peer is connected, attempting reconnection if needed + async fn ensure_peer_connected(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> { let (tx, rx) = oneshot::channel(); self.cmd.try_send(Command::CheckConnection(*peer_id, tx))?; let is_connected = rx.await.unwrap_or(false); if !is_connected { - // try reconnect - tracing::debug!( - "Peer {} is not connected, attempting to reconnect...", - peer_id - ); + tracing::debug!("Peer {} is not connected, attempting to reconnect...", peer_id); self.try_reconnect_peer(peer_id, timeout).await?; - // verify status + // Verify reconnection succeeded let (tx2, rx2) = oneshot::channel(); self.cmd.try_send(Command::CheckConnection(*peer_id, tx2))?; let reconnected = rx2.await.unwrap_or(false); if !reconnected { - return Err(anyhow!( - "Failed to establish connection to peer {}", - peer_id - )); + return Err(anyhow!("Failed to establish connection to peer {}", peer_id)); } tracing::info!("Successfully reconnected to peer {}", peer_id); } - // check direct connection + Ok(()) + } + + /// Ensure a direct (non-relayed) connection to a peer + async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> { + // First ensure the peer is connected + self.ensure_peer_connected(peer_id, timeout).await?; + + // Then verify it's a direct connection (not relayed) let address_book = self.address_book.read().await; if let Some(info) = address_book.info(peer_id) { let has_direct = info.addresses().any(|(_, _, _, is_relayed, _)| !is_relayed); @@ -1091,6 +1132,9 @@ impl Lattica { address_book.info(peer_id)?.rtt() } + /// Get a block from the network + /// + /// On timeout, this method will trigger reconnection for subsequent retries. pub async fn get_block( &self, cid: &Cid, @@ -1112,10 +1156,14 @@ impl Lattica { Err(anyhow!("Receiver channel closed")) } Err(_) => { - // Timeout occurred, cancel the query + // Timeout - cancel query and trigger reconnection for next attempt if let Some(qid) = query_id { let _ = self.cmd.try_send(Command::CancelGet(qid)); } + + // Trigger bootstrap reconnection in background for next retry + let _ = self.ensure_network_connected().await; + Err(anyhow!( "get_block timeout: block not found or request timed out after {:?}", timeout