Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 63 additions & 15 deletions lattica/src/network/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use chrono::Utc;
use cid::Cid;
use fnv::FnvHashMap;
use futures::io::WriteHalf;
use futures::{AsyncReadExt, AsyncWriteExt};
use futures::{AsyncReadExt, AsyncWriteExt, future::join_all};
use libp2p::{
Multiaddr, PeerId, Stream, Swarm, SwarmBuilder,
futures::StreamExt,
Expand Down Expand Up @@ -814,36 +814,77 @@ impl Lattica {
Err(anyhow!("Failed to reconnect to peer {}", peer_id))
}

async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> {
// check swarm
/// Ensure known data peers are connected (concurrent)
/// Note: Bootstrap and relay connections are managed by the global reconnect_timer in swarm_poll
async fn ensure_network_connected(&self) {
// Check known peers from address book
let address_book = self.address_book.read().await;
let known_peers = address_book.peers();
drop(address_book);

let mut peer_reconnect_futures = Vec::new();
for peer_id in known_peers.iter() {
// Skip bootstrap and relay nodes (handled by global timer)
let is_infra = self.config.bootstrap_nodes.iter().any(|addr| {
addr.iter().last() == Some(Protocol::P2p(*peer_id))
}) || self.config.relay_servers.iter().any(|addr| {
addr.iter().last() == Some(Protocol::P2p(*peer_id))
});
if is_infra {
continue;
}

let peer_id = *peer_id;
peer_reconnect_futures.push(async move {
self.ensure_peer_connected(&peer_id, Duration::from_secs(5)).await
});
}

// Execute all reconnections concurrently
if !peer_reconnect_futures.is_empty() {
let peer_count = peer_reconnect_futures.len();
tracing::info!("Checking {} data peers concurrently...", peer_count);

// Run all concurrently with overall timeout
let _ = tokio::time::timeout(Duration::from_secs(10), async {
let peer_results = join_all(peer_reconnect_futures).await;
let peer_success = peer_results.iter().filter(|r| r.is_ok()).count();
tracing::info!("Connection check complete: {}/{} peers", peer_success, peer_count);
}).await;
}
}

/// Ensure a peer is connected, attempting reconnection if needed
async fn ensure_peer_connected(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd.try_send(Command::CheckConnection(*peer_id, tx))?;
let is_connected = rx.await.unwrap_or(false);

if !is_connected {
// try reconnect
tracing::debug!(
"Peer {} is not connected, attempting to reconnect...",
peer_id
);
tracing::debug!("Peer {} is not connected, attempting to reconnect...", peer_id);
self.try_reconnect_peer(peer_id, timeout).await?;

// verify status
// Verify reconnection succeeded
let (tx2, rx2) = oneshot::channel();
self.cmd.try_send(Command::CheckConnection(*peer_id, tx2))?;
let reconnected = rx2.await.unwrap_or(false);

if !reconnected {
return Err(anyhow!(
"Failed to establish connection to peer {}",
peer_id
));
return Err(anyhow!("Failed to establish connection to peer {}", peer_id));
}

tracing::info!("Successfully reconnected to peer {}", peer_id);
}

// check direct connection
Ok(())
}

/// Ensure a direct (non-relayed) connection to a peer
async fn ensure_direct_connection(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> {
// First ensure the peer is connected
self.ensure_peer_connected(peer_id, timeout).await?;

// Then verify it's a direct connection (not relayed)
let address_book = self.address_book.read().await;
if let Some(info) = address_book.info(peer_id) {
let has_direct = info.addresses().any(|(_, _, _, is_relayed, _)| !is_relayed);
Expand Down Expand Up @@ -1091,6 +1132,9 @@ impl Lattica {
address_book.info(peer_id)?.rtt()
}

/// Get a block from the network
///
/// On timeout, this method will trigger reconnection for subsequent retries.
pub async fn get_block(
&self,
cid: &Cid,
Expand All @@ -1112,10 +1156,14 @@ impl Lattica {
Err(anyhow!("Receiver channel closed"))
}
Err(_) => {
// Timeout occurred, cancel the query
// Timeout - cancel query and trigger reconnection for next attempt
if let Some(qid) = query_id {
let _ = self.cmd.try_send(Command::CancelGet(qid));
}

// Trigger bootstrap reconnection in background for next retry
let _ = self.ensure_network_connected().await;

Err(anyhow!(
"get_block timeout: block not found or request timed out after {:?}",
timeout
Expand Down