Skip to content

Commit a153df9

Browse files
StemCllguillaumemichel
authored andcommitted
feat(swarm): allow behaviours to share addresses of peers
Resolves: #4302. Pull-Request: #4371.
1 parent b3bdee8 commit a153df9

File tree

20 files changed

+611
-75
lines changed

20 files changed

+611
-75
lines changed

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ libp2p-dcutr = { version = "0.11.0", path = "protocols/dcutr" }
8383
libp2p-dns = { version = "0.41.1", path = "transports/dns" }
8484
libp2p-floodsub = { version = "0.44.0", path = "protocols/floodsub" }
8585
libp2p-gossipsub = { version = "0.46.1", path = "protocols/gossipsub" }
86-
libp2p-identify = { version = "0.44.1", path = "protocols/identify" }
86+
libp2p-identify = { version = "0.44.2", path = "protocols/identify" }
8787
libp2p-identity = { version = "0.2.8" }
8888
libp2p-kad = { version = "0.46.0", path = "protocols/kad" }
8989
libp2p-mdns = { version = "0.45.1", path = "protocols/mdns" }
@@ -99,11 +99,11 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" }
9999
libp2p-quic = { version = "0.10.2", path = "transports/quic" }
100100
libp2p-relay = { version = "0.17.1", path = "protocols/relay" }
101101
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
102-
libp2p-request-response = { version = "0.26.1", path = "protocols/request-response" }
102+
libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" }
103103
libp2p-server = { version = "0.12.5", path = "misc/server" }
104104
libp2p-stream = { version = "0.1.0-alpha", path = "protocols/stream" }
105-
libp2p-swarm = { version = "0.44.1", path = "swarm" }
106-
libp2p-swarm-derive = { version = "=0.34.2", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
105+
libp2p-swarm = { version = "0.44.2", path = "swarm" }
106+
libp2p-swarm-derive = { version = "=0.34.3", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
107107
libp2p-swarm-test = { version = "0.3.0", path = "swarm-test" }
108108
libp2p-tcp = { version = "0.41.0", path = "transports/tcp" }
109109
libp2p-tls = { version = "0.3.0", path = "transports/tls" }

protocols/autonat/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ name = "libp2p-autonat"
33
edition = "2021"
44
rust-version = { workspace = true }
55
description = "NAT and firewall detection for libp2p"
6-
version = "0.12.0"
76
authors = ["David Craven <[email protected]>", "Elena Frank <[email protected]>"]
7+
version = "0.12.0"
88
license = "MIT"
99
repository = "https://github.com/libp2p/rust-libp2p"
1010
keywords = ["peer-to-peer", "libp2p", "networking"]

protocols/autonat/src/behaviour.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ impl Behaviour {
268268
pub fn add_server(&mut self, peer: PeerId, address: Option<Multiaddr>) {
269269
self.servers.insert(peer);
270270
if let Some(addr) = address {
271+
#[allow(deprecated)]
271272
self.inner.add_address(&peer, addr);
272273
}
273274
}

protocols/dcutr/tests/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ async fn wait_for_reservation(
203203
SwarmEvent::ExternalAddrConfirmed { address } if !is_renewal => {
204204
assert_eq!(address, client_addr);
205205
}
206+
SwarmEvent::NewExternalAddrOfPeer { .. } => {}
206207
e => panic!("{e:?}"),
207208
}
208209
}

protocols/identify/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## 0.44.2
2+
3+
- Emit `ToSwarm::NewExternalAddrOfPeer` for all external addresses of remote peers.
4+
For this work, the address cache must be enabled via `identify::Config::with_cache_size`.
5+
The default is 0, i.e. disabled.
6+
See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371).
7+
18
## 0.44.1
29

310
- Ensure `Multiaddr` handled and returned by `Behaviour` are `/p2p` terminated.

protocols/identify/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "libp2p-identify"
33
edition = "2021"
44
rust-version = { workspace = true }
55
description = "Nodes identifcation protocol for libp2p"
6-
version = "0.44.1"
6+
version = "0.44.2"
77
authors = ["Parity Technologies <[email protected]>"]
88
license = "MIT"
99
repository = "https://github.com/libp2p/rust-libp2p"

protocols/identify/src/behaviour.rs

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@ use libp2p_identity::PublicKey;
2626
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
2727
use libp2p_swarm::{
2828
ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour,
29-
NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm,
29+
NotifyHandler, PeerAddresses, StreamUpgradeError, THandlerInEvent, ToSwarm,
3030
};
3131
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
32-
use lru::LruCache;
32+
3333
use std::collections::hash_map::Entry;
3434
use std::num::NonZeroUsize;
3535
use std::{
3636
collections::{HashMap, HashSet, VecDeque},
37-
iter::FromIterator,
3837
task::Context,
3938
task::Poll,
4039
time::Duration,
@@ -200,9 +199,9 @@ impl Behaviour {
200199
.or_default()
201200
.insert(conn, addr);
202201

203-
if let Some(entry) = self.discovered_peers.get_mut(&peer_id) {
202+
if let Some(cache) = self.discovered_peers.0.as_mut() {
204203
for addr in failed_addresses {
205-
entry.remove(addr);
204+
cache.remove(&peer_id, addr);
206205
}
207206
}
208207
}
@@ -268,13 +267,23 @@ impl NetworkBehaviour for Behaviour {
268267
info.listen_addrs
269268
.retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));
270269

271-
// Replace existing addresses to prevent other peer from filling up our memory.
272-
self.discovered_peers
273-
.put(peer_id, info.listen_addrs.iter().cloned());
274-
275270
let observed = info.observed_addr.clone();
276271
self.events
277-
.push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info }));
272+
.push_back(ToSwarm::GenerateEvent(Event::Received {
273+
peer_id,
274+
info: info.clone(),
275+
}));
276+
277+
if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
278+
for address in &info.listen_addrs {
279+
if discovered_peers.add(peer_id, address.clone()) {
280+
self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
281+
peer_id,
282+
address: address.clone(),
283+
});
284+
}
285+
}
286+
}
278287

279288
match self.our_observed_addresses.entry(id) {
280289
Entry::Vacant(not_yet_observed) => {
@@ -387,11 +396,11 @@ impl NetworkBehaviour for Behaviour {
387396
self.our_observed_addresses.remove(&connection_id);
388397
}
389398
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
390-
if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) {
391-
if let DialError::Transport(errors) = error {
392-
for (addr, _error) in errors {
393-
entry.remove(addr);
394-
}
399+
if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
400+
(peer_id, self.discovered_peers.0.as_mut(), error)
401+
{
402+
for (addr, _error) in errors {
403+
cache.remove(&peer_id, addr);
395404
}
396405
}
397406
}
@@ -445,42 +454,23 @@ fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
445454
true
446455
}
447456

448-
struct PeerCache(Option<LruCache<PeerId, HashSet<Multiaddr>>>);
457+
struct PeerCache(Option<PeerAddresses>);
449458

450459
impl PeerCache {
451460
fn disabled() -> Self {
452461
Self(None)
453462
}
454463

455464
fn enabled(size: NonZeroUsize) -> Self {
456-
Self(Some(LruCache::new(size)))
457-
}
458-
459-
fn get_mut(&mut self, peer: &PeerId) -> Option<&mut HashSet<Multiaddr>> {
460-
self.0.as_mut()?.get_mut(peer)
461-
}
462-
463-
fn put(&mut self, peer: PeerId, addresses: impl Iterator<Item = Multiaddr>) {
464-
let cache = match self.0.as_mut() {
465-
None => return,
466-
Some(cache) => cache,
467-
};
468-
469-
let addresses = addresses.filter_map(|a| a.with_p2p(peer).ok());
470-
cache.put(peer, HashSet::from_iter(addresses));
465+
Self(Some(PeerAddresses::new(size)))
471466
}
472467

473468
fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
474-
let cache = match self.0.as_mut() {
475-
None => return Vec::new(),
476-
Some(cache) => cache,
477-
};
478-
479-
cache
480-
.get(peer)
481-
.cloned()
482-
.map(Vec::from_iter)
483-
.unwrap_or_default()
469+
if let Some(cache) = self.0.as_mut() {
470+
cache.get(peer).collect()
471+
} else {
472+
Vec::new()
473+
}
484474
}
485475
}
486476

protocols/identify/tests/smoke.rs

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async fn only_emits_address_candidate_once_per_connection() {
110110
async_std::task::spawn(swarm2.loop_on_next());
111111

112112
let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx))
113-
.take(5)
113+
.take(8)
114114
.collect::<Vec<_>>()
115115
.await;
116116

@@ -156,6 +156,78 @@ async fn only_emits_address_candidate_once_per_connection() {
156156
);
157157
}
158158

159+
#[async_std::test]
160+
async fn emits_unique_listen_addresses() {
161+
let _ = tracing_subscriber::fmt()
162+
.with_env_filter(EnvFilter::from_default_env())
163+
.try_init();
164+
165+
let mut swarm1 = Swarm::new_ephemeral(|identity| {
166+
identify::Behaviour::new(
167+
identify::Config::new("a".to_string(), identity.public())
168+
.with_agent_version("b".to_string())
169+
.with_interval(Duration::from_secs(1))
170+
.with_cache_size(10),
171+
)
172+
});
173+
let mut swarm2 = Swarm::new_ephemeral(|identity| {
174+
identify::Behaviour::new(
175+
identify::Config::new("c".to_string(), identity.public())
176+
.with_agent_version("d".to_string()),
177+
)
178+
});
179+
180+
let (swarm2_mem_listen_addr, swarm2_tcp_listen_addr) =
181+
swarm2.listen().with_memory_addr_external().await;
182+
let swarm2_peer_id = *swarm2.local_peer_id();
183+
swarm1.connect(&mut swarm2).await;
184+
185+
async_std::task::spawn(swarm2.loop_on_next());
186+
187+
let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx))
188+
.take(8)
189+
.collect::<Vec<_>>()
190+
.await;
191+
192+
let infos = swarm_events
193+
.iter()
194+
.filter_map(|e| match e {
195+
SwarmEvent::Behaviour(identify::Event::Received { info, .. }) => Some(info.clone()),
196+
_ => None,
197+
})
198+
.collect::<Vec<_>>();
199+
200+
assert!(
201+
infos.len() > 1,
202+
"should exchange identify payload more than once"
203+
);
204+
205+
let listen_addrs = infos
206+
.iter()
207+
.map(|i| i.listen_addrs.clone())
208+
.collect::<Vec<_>>();
209+
210+
for addrs in listen_addrs {
211+
assert_eq!(addrs.len(), 2);
212+
assert!(addrs.contains(&swarm2_mem_listen_addr));
213+
assert!(addrs.contains(&swarm2_tcp_listen_addr));
214+
}
215+
216+
let reported_addrs = swarm_events
217+
.iter()
218+
.filter_map(|e| match e {
219+
SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
220+
Some((*peer_id, address.clone()))
221+
}
222+
_ => None,
223+
})
224+
.collect::<Vec<_>>();
225+
226+
assert_eq!(reported_addrs.len(), 2, "To have two addresses of remote");
227+
assert!(reported_addrs.contains(&(swarm2_peer_id, swarm2_mem_listen_addr)));
228+
assert!(reported_addrs.contains(&(swarm2_peer_id, swarm2_tcp_listen_addr)));
229+
}
230+
159231
#[async_std::test]
160232
async fn identify_push() {
161233
let _ = tracing_subscriber::fmt()

protocols/request-response/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.26.2
2+
3+
- Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`.
4+
See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371).
5+
16
## 0.26.1
27

38
- Derive `PartialOrd` and `Ord` for `{Out,In}boundRequestId`.

protocols/request-response/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "libp2p-request-response"
33
edition = "2021"
44
rust-version = { workspace = true }
55
description = "Generic Request/Response Protocols"
6-
version = "0.26.1"
6+
version = "0.26.2"
77
authors = ["Parity Technologies <[email protected]>"]
88
license = "MIT"
99
repository = "https://github.com/libp2p/rust-libp2p"
@@ -40,7 +40,7 @@ libp2p-yamux = { workspace = true }
4040
rand = "0.8"
4141
libp2p-swarm-test = { path = "../../swarm-test" }
4242
futures_ringbuf = "0.4.0"
43-
serde = { version = "1.0", features = ["derive"]}
43+
serde = { version = "1.0", features = ["derive"] }
4444
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
4545

4646
# Passing arguments to the docsrs builder in order to properly document cfg's.

0 commit comments

Comments
 (0)