Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
80e7fdd
Repeat ping request periodically
dharjeezy Jul 19, 2025
d6b3909
keep substream open by introducing stateful peer
dharjeezy Oct 6, 2025
9f8fe8c
use stream map for efficiency
dharjeezy Nov 29, 2025
f26e25c
separate inbound and outbound streams
dharjeezy Dec 14, 2025
9c535ed
fmt
dharjeezy Dec 14, 2025
2401ca7
review changes
dharjeezy Dec 24, 2025
fd0a305
review changes
dharjeezy Dec 24, 2025
b4e6a3b
nit
dharjeezy Jan 18, 2026
0b5f3c9
Merge branch 'master' of https://github.com/dharjeezy/litep2p into da…
dharjeezy Feb 21, 2026
4f32e97
include param
dharjeezy Feb 21, 2026
51792f5
Handle edge-cases and older (buggy) litep2p versions
dmitry-markin Feb 23, 2026
415029a
WIP: fix connection keep alive
dmitry-markin Feb 23, 2026
70c7848
Fix connection keep-alive for TCP transport
dmitry-markin Feb 24, 2026
fcbfc5a
Fix connection keep-alive for other transports
dmitry-markin Feb 24, 2026
d9529b3
Update tests
dmitry-markin Feb 24, 2026
a26c710
Merge remote-tracking branch 'upstream/master' into dami/repeat-reque…
dmitry-markin Feb 24, 2026
eda96c9
Improve docs
dmitry-markin Feb 25, 2026
70cdbee
More doc improvements
dmitry-markin Feb 25, 2026
ef230ed
Fix request-response timeout tests
dmitry-markin Feb 25, 2026
4070565
Apply review suggestions
dmitry-markin Feb 25, 2026
65b024d
Keep TCP connections with `SubstreamKeepAlive::Yes` substreams
dmitry-markin Feb 25, 2026
938fbe6
Keep other connections with `SubstreamKeepAlive::Yes` substreams alive
dmitry-markin Feb 25, 2026
e2151cd
minor: make clippy happy
dmitry-markin Feb 25, 2026
d388dca
minor: doc
dmitry-markin Feb 25, 2026
6b2e674
minor: doc
dmitry-markin Feb 25, 2026
838542f
minor: typo
dmitry-markin Feb 25, 2026
a6d81a1
Revert enabling all features by default
dmitry-markin Feb 25, 2026
622a349
Separate names for opening/lifetime permits
dmitry-markin Feb 26, 2026
b8d73cf
Make pings async
dmitry-markin Feb 26, 2026
2ab2bf7
Merge remote-tracking branch 'origin/master' into dm-ping-keep-alive
dmitry-markin Feb 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/protocol/libp2p/ping/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
codec::ProtocolCodec, protocol::libp2p::ping::PingEvent, types::protocol::ProtocolName,
DEFAULT_CHANNEL_SIZE,
};
use std::time::Duration;

use futures::Stream;
use tokio::sync::mpsc::{channel, Sender};
Expand All @@ -36,6 +37,8 @@ const PING_PAYLOAD_SIZE: usize = 32;
/// Maximum PING failures.
const MAX_FAILURES: usize = 3;

pub const PING_INTERVAL: Duration = Duration::from_secs(15);

/// Ping configuration.
pub struct Config {
/// Protocol name.
Expand All @@ -49,6 +52,8 @@ pub struct Config {

/// TX channel for sending events to the user protocol.
pub(crate) tx_event: Sender<PingEvent>,

pub(crate) ping_interval: Duration,
}

impl Config {
Expand All @@ -61,6 +66,7 @@ impl Config {
(
Self {
tx_event,
ping_interval: PING_INTERVAL,
max_failures: MAX_FAILURES,
protocol: ProtocolName::from(PROTOCOL_NAME),
codec: ProtocolCodec::Identity(PING_PAYLOAD_SIZE),
Expand All @@ -80,6 +86,7 @@ pub struct ConfigBuilder {

/// Maximum failures before the peer is considered unreachable.
max_failures: usize,
ping_interval: Duration,
}

impl Default for ConfigBuilder {
Expand All @@ -92,6 +99,7 @@ impl ConfigBuilder {
/// Create new default [`Config`] which can be modified by the user.
pub fn new() -> Self {
Self {
ping_interval: PING_INTERVAL,
max_failures: MAX_FAILURES,
protocol: ProtocolName::from(PROTOCOL_NAME),
codec: ProtocolCodec::Identity(PING_PAYLOAD_SIZE),
Expand All @@ -104,13 +112,19 @@ impl ConfigBuilder {
self
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is cargo doc check ignoring missings docs on public methods? We should also document this and state the defaults (same for above in public interfaces)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

litep2p doesn't have #![warn(missing_docs)] set. But this is for another PR :)

pub fn with_ping_interval(mut self, ping_interval: Duration) -> Self {
self.ping_interval = ping_interval;
self
}

/// Build [`Config`].
pub fn build(self) -> (Config, Box<dyn Stream<Item = PingEvent> + Send + Unpin>) {
let (tx_event, rx_event) = channel(DEFAULT_CHANNEL_SIZE);

(
Config {
tx_event,
ping_interval: self.ping_interval,
max_failures: self.max_failures,
protocol: self.protocol,
codec: self.codec,
Expand Down
219 changes: 119 additions & 100 deletions src/protocol/libp2p/ping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,21 @@
//! [`/ipfs/ping/1.0.0`](https://github.com/libp2p/specs/blob/master/ping/ping.md) implementation.

use crate::{
error::{Error, SubstreamError},
protocol::{Direction, TransportEvent, TransportService},
substream::Substream,
types::SubstreamId,
PeerId,
};

use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use tokio::sync::mpsc::Sender;

use bytes::Bytes;
use futures::{stream::SplitSink, SinkExt, StreamExt};
use std::{
collections::HashSet,
collections::HashMap,
time::{Duration, Instant},
};
use tokio::sync::mpsc;
use tokio_stream::StreamMap;

pub use config::{Config, ConfigBuilder};

mod config;

// TODO: https://github.com/paritytech/litep2p/issues/132 let the user handle max failures
Expand All @@ -60,23 +58,28 @@ pub enum PingEvent {

/// Ping protocol.
pub(crate) struct Ping {
/// Maximum failures before the peer is considered unreachable.
_max_failures: usize,

// Connection service.
service: TransportService,

/// TX channel for sending events to the user protocol.
tx: Sender<PingEvent>,
tx: mpsc::Sender<PingEvent>,

/// Streams we read Pongs from.
outbound_streams: StreamMap<PeerId, futures::stream::SplitStream<Substream>>,

/// Connected peers.
peers: HashSet<PeerId>,
/// Sinks we write Pings to.
outbound_sinks: HashMap<PeerId, SplitSink<Substream, Bytes>>,

/// Pending outbound substreams.
pending_outbound: FuturesUnordered<BoxFuture<'static, crate::Result<(PeerId, Duration)>>>,
/// Streams we read Pings from.
/// Keyed by PeerId which enforces one stream per peer
inbound_streams: StreamMap<PeerId, futures::stream::SplitStream<Substream>>,

/// Pending inbound substreams.
pending_inbound: FuturesUnordered<BoxFuture<'static, crate::Result<()>>>,
/// Sinks we write Pongs to.
inbound_sinks: HashMap<PeerId, SplitSink<Substream, Bytes>>,
/// We need to track when we sent the ping to calculate the duration.
ping_times: HashMap<PeerId, Instant>,

ping_interval: Duration,
}

impl Ping {
Expand All @@ -85,126 +88,142 @@ impl Ping {
Self {
service,
tx: config.tx_event,
peers: HashSet::new(),
pending_outbound: FuturesUnordered::new(),
pending_inbound: FuturesUnordered::new(),
_max_failures: config.max_failures,
ping_interval: config.ping_interval,
outbound_streams: StreamMap::new(),
outbound_sinks: HashMap::new(),
ping_times: HashMap::new(),
inbound_streams: StreamMap::new(),
inbound_sinks: HashMap::new(),
}
}

/// Connection established to remote peer.
fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> {
tracing::trace!(target: LOG_TARGET, ?peer, "connection established");

self.service.open_substream(peer)?;
self.peers.insert(peer);
fn on_connection_established(&mut self, peer: PeerId) {
tracing::debug!(target: LOG_TARGET, ?peer, "connection established, opening ping substream");

Ok(())
if let Err(error) = self.service.open_substream(peer) {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream");
}
}

/// Connection closed to remote peer.
fn on_connection_closed(&mut self, peer: PeerId) {
tracing::trace!(target: LOG_TARGET, ?peer, "connection closed");
tracing::debug!(target: LOG_TARGET, ?peer, "connection closed");
self.outbound_streams.remove(&peer);
self.outbound_sinks.remove(&peer);
self.ping_times.remove(&peer);

self.peers.remove(&peer);
self.inbound_streams.remove(&peer);
self.inbound_sinks.remove(&peer);
}

/// Handle outbound substream.
fn on_outbound_substream(
&mut self,
peer: PeerId,
substream_id: SubstreamId,
mut substream: Substream,
) {
tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");

self.pending_outbound.push(Box::pin(async move {
let future = async move {
// TODO: https://github.com/paritytech/litep2p/issues/134 generate random payload and verify it
substream.send_framed(vec![0u8; 32].into()).await?;
let now = Instant::now();
let _ = substream.next().await.ok_or(Error::SubstreamError(
SubstreamError::ReadFailure(Some(substream_id)),
))?;
let _ = substream.close().await;

Ok(now.elapsed())
};

match tokio::time::timeout(Duration::from_secs(10), future).await {
Err(_) => Err(Error::Timeout),
Ok(Err(error)) => Err(error),
Ok(Ok(elapsed)) => Ok((peer, elapsed)),
}
}));
/// Handle outbound substream (We initiated)
/// Registers it into the Outbound pipeline.
fn on_outbound_substream(&mut self, peer: PeerId, substream: Substream) {
tracing::trace!(target: LOG_TARGET, ?peer, "outbound ping substream registered");
let (sink, stream) = substream.split();
self.outbound_streams.insert(peer, stream);
self.outbound_sinks.insert(peer, sink);
}

/// Substream opened to remote peer.
fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) {
tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound substream");

self.pending_inbound.push(Box::pin(async move {
let future = async move {
let payload = substream
.next()
.await
.ok_or(Error::SubstreamError(SubstreamError::ReadFailure(None)))??;
substream.send_framed(payload.freeze()).await?;
let _ = substream.next().await.map(|_| ());

Ok(())
};

match tokio::time::timeout(Duration::from_secs(10), future).await {
Err(_) => Err(Error::Timeout),
Ok(Err(error)) => Err(error),
Ok(Ok(())) => Ok(()),
}
}));
/// Handle inbound substream (They initiated).
/// Registers it into the Inbound pipeline.
fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
tracing::trace!(target: LOG_TARGET, ?peer, "inbound ping substream registered");
let (sink, stream) = substream.split();

self.inbound_streams.insert(peer, stream);
self.inbound_sinks.insert(peer, sink);
}

/// Start [`Ping`] event loop.
pub async fn run(mut self) {
tracing::debug!(target: LOG_TARGET, "starting ping event loop");
let mut interval = tokio::time::interval(self.ping_interval);

loop {
tokio::select! {
event = self.service.next() => match event {
Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
let _ = self.on_connection_established(peer);
self.on_connection_established(peer);
}
Some(TransportEvent::ConnectionClosed { peer }) => {
self.on_connection_closed(peer);
}
Some(TransportEvent::SubstreamOpened {
peer,
substream,
direction,
..
}) => match direction {
Some(TransportEvent::SubstreamOpened { peer, substream, direction,.. }) => match direction {
Direction::Inbound => {
self.on_inbound_substream(peer, substream);
}
Direction::Outbound(substream_id) => {
self.on_outbound_substream(peer, substream_id, substream);
Direction::Outbound(_) => {
self.on_outbound_substream(peer, substream);
}
},
}
Some(_) => {}
None => return,
},
_event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {}
event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => {

_ = interval.tick() => {
for (peer, sink) in self.outbound_sinks.iter_mut() {
// TODO: https://github.com/paritytech/litep2p/issues/134 generate random payload and verify it
let payload = vec![0u8; 32];

tracing::trace!(target: LOG_TARGET, ?peer, "sending ping");

if let Err(error) = sink.send(Bytes::from(payload)).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to send ping");
} else {
self.ping_times.insert(*peer, Instant::now());
}
}
}

// Handle Outbound Responses (Pong is expected here)
Some((peer, event)) = self.outbound_streams.next() => {
match event {
Some(Ok((peer, elapsed))) => {
let _ = self
.tx
.send(PingEvent::Ping {
peer,
ping: elapsed,
})
.await;
Ok(_payload) => {
if let Some(started) = self.ping_times.remove(&peer) {

let elapsed = started.elapsed();
tracing::trace!(target: LOG_TARGET, ?peer, ?elapsed, "pong received");
let _ = self.tx.send(PingEvent::Ping { peer, ping: elapsed }).await;
}
}
Err(error) => {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "ping substream closed/error");
self.outbound_streams.remove(&peer);
self.outbound_sinks.remove(&peer);
self.ping_times.remove(&peer);
}
}
}

// Handle Inbound Pings
Some((peer, event)) = self.inbound_streams.next() => {
match event {
Ok(payload) => {
if let Some(sink) = self.inbound_sinks.get_mut(&peer) {
tracing::trace!(target: LOG_TARGET, ?peer, "sending pong");
if let Err(error) = sink.send(payload.freeze()).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to send pong");
}
} else {
tracing::debug!(
target: LOG_TARGET,
?peer,
"received ping from peer but no sink available to reply"
);
}
}
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?error,
"inbound ping substream error"
);
self.inbound_streams.remove(&peer);
self.inbound_sinks.remove(&peer);
}
event => tracing::debug!(target: LOG_TARGET, "failed to handle ping for an outbound peer: {event:?}"),
}
}
}
Expand Down
Loading