Skip to content
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions lattica/src/network/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use chrono::Utc;
use cid::Cid;
use fnv::FnvHashMap;
use futures::io::WriteHalf;
use futures::{AsyncReadExt, AsyncWriteExt};
use futures::{AsyncReadExt, AsyncWriteExt, future::join_all};
use libp2p::{
Multiaddr, PeerId, Stream, Swarm, SwarmBuilder,
futures::StreamExt,
Expand Down Expand Up @@ -814,6 +814,62 @@ impl Lattica {
Err(anyhow!("Failed to reconnect to peer {}", peer_id))
}

/// Ensure known data peers are connected (concurrent)
/// Note: Bootstrap and relay connections are managed by the global reconnect_timer in swarm_poll
async fn ensure_network_connected(&self) {
// Check known peers from address book
let address_book = self.address_book.read().await;
let known_peers = address_book.peers();
drop(address_book);

let mut peer_reconnect_futures = Vec::new();
for peer_id in known_peers.iter() {
// Skip bootstrap and relay nodes (handled by global timer)
let is_infra = self.config.bootstrap_nodes.iter().any(|addr| {
addr.iter().last() == Some(Protocol::P2p(*peer_id))
}) || self.config.relay_servers.iter().any(|addr| {
addr.iter().last() == Some(Protocol::P2p(*peer_id))
});
if is_infra {
continue;
}

let (tx, rx) = oneshot::channel();
if self.cmd.try_send(Command::CheckConnection(*peer_id, tx)).is_ok() {
if !rx.await.unwrap_or(false) {
peer_reconnect_futures.push(self.try_reconnect_peer_logged(*peer_id));
}
}
}

// Execute all reconnections concurrently
if !peer_reconnect_futures.is_empty() {
let peer_count = peer_reconnect_futures.len();
tracing::info!("Reconnecting {} data peers concurrently...", peer_count);

// Run all concurrently with overall timeout
let _ = tokio::time::timeout(Duration::from_secs(10), async {
let peer_results = join_all(peer_reconnect_futures).await;
let peer_success = peer_results.iter().filter(|r| r.is_ok()).count();
tracing::info!("Reconnection complete: {}/{} peers", peer_success, peer_count);
}).await;
}
}

/// Try to reconnect peer with logging
async fn try_reconnect_peer_logged(&self, peer_id: PeerId) -> Result<()> {
match self.try_reconnect_peer(&peer_id, Duration::from_secs(5)).await {
Ok(_) => {
tracing::info!("Reconnected to data provider peer {}", peer_id);
Ok(())
}
Err(e) => {
tracing::debug!("Failed to reconnect peer {}: {}", peer_id, e);
Err(e)
}
}
}

async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> {
// check swarm
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -1091,6 +1147,9 @@ impl Lattica {
address_book.info(peer_id)?.rtt()
}

/// Get a block from the network
///
/// On timeout, this method will trigger reconnection for subsequent retries.
pub async fn get_block(
&self,
cid: &Cid,
Expand All @@ -1112,10 +1171,14 @@ impl Lattica {
Err(anyhow!("Receiver channel closed"))
}
Err(_) => {
// Timeout occurred, cancel the query
// Timeout - cancel query and trigger reconnection for next attempt
if let Some(qid) = query_id {
let _ = self.cmd.try_send(Command::CancelGet(qid));
}

// Trigger bootstrap reconnection in background for next retry
let _ = self.ensure_network_connected().await;

Err(anyhow!(
"get_block timeout: block not found or request timed out after {:?}",
timeout
Expand Down