Skip to content

Commit fcc87f4

Browse files
authored
Merge pull request #347 from mintlayer/p2p-test-conn-improvement
Add timeout for connection establishment in P2P tests
2 parents ee40a6b + 753b145 commit fcc87f4

10 files changed

Lines changed: 171 additions & 219 deletions

File tree

p2p/p2p-test-utils/src/lib.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@ use common::{
3232
};
3333
use crypto::random::SliceRandom;
3434
use libp2p::Multiaddr;
35-
use std::net::SocketAddr;
36-
use std::sync::Arc;
37-
use tokio::net::{TcpListener, TcpStream};
35+
use p2p::net::{types::ConnectivityEvent, ConnectivityService, NetworkingService};
36+
use std::{net::SocketAddr, sync::Arc, time::Duration};
37+
use tokio::{
38+
net::{TcpListener, TcpStream},
39+
time::timeout,
40+
};
3841

3942
pub fn make_libp2p_addr() -> Multiaddr {
4043
"/ip6/::1/tcp/0".parse().unwrap()
@@ -67,6 +70,37 @@ pub async fn get_tcp_socket() -> TcpStream {
6770
TcpStream::connect(addr).await.unwrap()
6871
}
6972

73+
pub async fn connect_services<T>(
74+
conn1: &mut T::ConnectivityHandle,
75+
conn2: &mut T::ConnectivityHandle,
76+
) where
77+
T: NetworkingService + std::fmt::Debug,
78+
T::ConnectivityHandle: ConnectivityService<T>,
79+
{
80+
let addr = timeout(Duration::from_secs(5), conn2.local_addr())
81+
.await
82+
.expect("local address fetch not to timeout")
83+
.unwrap()
84+
.unwrap();
85+
conn1.connect(addr).await.expect("dial to succeed");
86+
87+
match timeout(Duration::from_secs(5), conn2.poll_next()).await {
88+
Ok(event) => match event.unwrap() {
89+
ConnectivityEvent::InboundAccepted { .. } => {}
90+
event => panic!("expected `InboundAccepted`, got {event:?}"),
91+
},
92+
Err(_err) => panic!("did not receive `InboundAccepted` in time"),
93+
}
94+
95+
match timeout(Duration::from_secs(5), conn1.poll_next()).await {
96+
Ok(event) => match event.unwrap() {
97+
ConnectivityEvent::OutboundAccepted { .. } => {}
98+
event => panic!("expected `OutboundAccepted`, got {event:?}"),
99+
},
100+
Err(_err) => panic!("did not receive `OutboundAccepted` in time"),
101+
}
102+
}
103+
70104
pub type ChainstateHandle = subsystem::Handle<Box<dyn ChainstateInterface + 'static>>;
71105

72106
#[derive(Debug, Clone, PartialEq, Eq)]

p2p/src/net/libp2p/behaviour.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ use std::{
6161
poll_method = "poll"
6262
)]
6363
pub struct Libp2pBehaviour {
64+
pub connmgr: connectivity::ConnectionManager,
65+
pub identify: identify::Identify,
66+
pub discovery: discovery::DiscoveryManager,
6467
pub gossipsub: Gossipsub,
6568
pub ping: ping::Behaviour,
66-
pub identify: identify::Identify,
6769
pub sync: RequestResponse<SyncingCodec>,
68-
pub connmgr: connectivity::ConnectionManager,
69-
pub discovery: discovery::DiscoveryManager,
7070

7171
#[behaviour(ignore)]
7272
pub events: VecDeque<Libp2pBehaviourEvent>,

p2p/src/net/libp2p/tests/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use std::{
3838
sync::Arc,
3939
time::Duration,
4040
};
41-
use tokio::sync::mpsc;
41+
use tokio::{sync::mpsc, time::timeout};
4242

4343
#[cfg(test)]
4444
mod frontend;
@@ -247,7 +247,11 @@ pub async fn make_libp2p_with_ping(
247247

248248
async fn get_address<T: NetworkBehaviour>(swarm: &mut Swarm<T>) -> Multiaddr {
249249
loop {
250-
if let SwarmEvent::NewListenAddr { address, .. } = swarm.select_next_some().await {
250+
if let SwarmEvent::NewListenAddr { address, .. } =
251+
timeout(Duration::from_secs(5), swarm.select_next_some())
252+
.await
253+
.expect("event to be received")
254+
{
251255
return address;
252256
}
253257
}
@@ -262,7 +266,7 @@ where
262266
let addr = get_address::<A>(swarm1).await;
263267

264268
for _ in 0..3 {
265-
swarm2.dial(addr.clone()).expect("swarm dial failed");
269+
swarm2.dial(addr.clone()).expect("dial to succeed");
266270

267271
loop {
268272
tokio::select! {

p2p/src/swarm/tests/ban.rs

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use crate::{
1717
error::{P2pError, PeerError},
1818
net::{self, libp2p::Libp2pService, mock::MockService, ConnectivityService, NetworkingService},
19-
swarm::tests::make_peer_manager,
19+
swarm::tests::{connect_services, make_peer_manager},
2020
};
2121
use common::chain::config;
2222
use libp2p::Multiaddr;
@@ -26,7 +26,7 @@ use std::sync::Arc;
2626
// ban peer whose connected to us
2727
async fn ban_connected_peer<T>(addr1: T::Address, addr2: T::Address)
2828
where
29-
T: NetworkingService + 'static,
29+
T: NetworkingService + 'static + std::fmt::Debug,
3030
T::ConnectivityHandle: ConnectivityService<T>,
3131
<T as net::NetworkingService>::Address: std::str::FromStr,
3232
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
@@ -35,13 +35,8 @@ where
3535
let mut swarm1 = make_peer_manager::<T>(addr1, Arc::clone(&config)).await;
3636
let mut swarm2 = make_peer_manager::<T>(addr2, config).await;
3737

38-
let addr = swarm2.handle.local_addr().await.unwrap().unwrap();
39-
let (_conn1_res, conn2_res) =
40-
tokio::join!(swarm1.handle.connect(addr), swarm2.handle.poll_next(),);
41-
42-
if let Ok(net::types::ConnectivityEvent::InboundAccepted { peer_info, address }) = conn2_res {
43-
swarm2.accept_inbound_connection(address, peer_info).await.unwrap();
44-
}
38+
let (address, peer_info) = connect_services::<T>(&mut swarm1.handle, &mut swarm2.handle).await;
39+
swarm2.accept_inbound_connection(address, peer_info).await.unwrap();
4540

4641
let peer_id = *swarm1.handle_mut().peer_id();
4742
assert_eq!(swarm2.adjust_peer_score(peer_id, 1000).await, Ok(()));
@@ -74,13 +69,8 @@ where
7469
let mut swarm1 = make_peer_manager::<T>(addr1, Arc::clone(&config)).await;
7570
let mut swarm2 = make_peer_manager::<T>(addr2, config).await;
7671

77-
let addr = swarm2.handle.local_addr().await.unwrap().unwrap();
78-
let (_conn1_res, conn2_res) =
79-
tokio::join!(swarm1.handle.connect(addr), swarm2.handle.poll_next(),);
80-
81-
if let Ok(net::types::ConnectivityEvent::InboundAccepted { peer_info, address }) = conn2_res {
82-
swarm2.accept_inbound_connection(address, peer_info).await.unwrap();
83-
}
72+
let (address, peer_info) = connect_services::<T>(&mut swarm1.handle, &mut swarm2.handle).await;
73+
swarm2.accept_inbound_connection(address, peer_info).await.unwrap();
8474

8575
let peer_id = *swarm1.handle_mut().peer_id();
8676
assert_eq!(swarm2.adjust_peer_score(peer_id, 1000).await, Ok(()));
@@ -116,7 +106,7 @@ async fn banned_peer_attempts_to_connect_mock() {
116106
// attempt to connect to banned peer
117107
async fn connect_to_banned_peer<T>(addr1: T::Address, addr2: T::Address)
118108
where
119-
T: NetworkingService + 'static,
109+
T: NetworkingService + 'static + std::fmt::Debug,
120110
T::ConnectivityHandle: ConnectivityService<T>,
121111
<T as net::NetworkingService>::Address: std::str::FromStr,
122112
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
@@ -125,13 +115,8 @@ where
125115
let mut swarm1 = make_peer_manager::<T>(addr1, Arc::clone(&config)).await;
126116
let mut swarm2 = make_peer_manager::<T>(addr2, config).await;
127117

128-
let addr = swarm2.handle.local_addr().await.unwrap().unwrap();
129-
let (_conn1_res, conn2_res) =
130-
tokio::join!(swarm1.handle.connect(addr), swarm2.handle.poll_next(),);
131-
132-
if let Ok(net::types::ConnectivityEvent::InboundAccepted { peer_info, address }) = conn2_res {
133-
swarm2.accept_inbound_connection(address, peer_info).await.unwrap();
134-
}
118+
let (address, peer_info) = connect_services::<T>(&mut swarm1.handle, &mut swarm2.handle).await;
119+
swarm2.accept_inbound_connection(address, peer_info).await.unwrap();
135120

136121
let peer_id = *swarm1.handle_mut().peer_id();
137122
assert_eq!(swarm2.adjust_peer_score(peer_id, 1000).await, Ok(()));
@@ -391,10 +376,7 @@ where
391376
)
392377
.await;
393378

394-
let addr = swarm2.handle.local_addr().await.unwrap().unwrap();
395-
let (_conn1_res, conn2_res) =
396-
tokio::join!(swarm1.handle.connect(addr), swarm2.handle.poll_next());
397-
let _conn2_res: net::types::ConnectivityEvent<T> = conn2_res.unwrap();
379+
connect_services::<T>(&mut swarm1.handle, &mut swarm2.handle).await;
398380
let swarm1_id = *swarm1.handle.peer_id();
399381

400382
// run the first peer manager in the background and poll events from the peer manager

p2p/src/swarm/tests/connections.rs

Lines changed: 31 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ use crate::{
2121
mock::{types::MockPeerId, MockService},
2222
ConnectivityService, NetworkingService,
2323
},
24-
swarm::{self, tests::make_peer_manager},
24+
swarm::{
25+
self,
26+
tests::{connect_services, make_peer_manager},
27+
},
2528
};
2629
use common::chain::config;
2730
use libp2p::{Multiaddr, PeerId};
@@ -31,7 +34,7 @@ use std::{net::SocketAddr, sync::Arc};
3134
// try to connect to an address that no one listening on and verify it fails
3235
async fn test_swarm_connect<T: NetworkingService>(bind_addr: T::Address, remote_addr: T::Address)
3336
where
34-
T: NetworkingService + 'static,
37+
T: NetworkingService + 'static + std::fmt::Debug,
3538
T::ConnectivityHandle: ConnectivityService<T>,
3639
<T as net::NetworkingService>::Address: std::str::FromStr,
3740
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
@@ -73,7 +76,7 @@ async fn test_swarm_connect_libp2p() {
7376
// is below the desired threshold and there are idle peers in the peerdb
7477
async fn test_auto_connect<T>(addr1: T::Address, addr2: T::Address)
7578
where
76-
T: NetworkingService + 'static,
79+
T: NetworkingService + 'static + std::fmt::Debug,
7780
T::ConnectivityHandle: ConnectivityService<T>,
7881
<T as net::NetworkingService>::Address: std::str::FromStr,
7982
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
@@ -118,7 +121,7 @@ async fn test_auto_connect_mock() {
118121

119122
async fn connect_outbound_same_network<T>(addr1: T::Address, addr2: T::Address)
120123
where
121-
T: NetworkingService + 'static,
124+
T: NetworkingService + 'static + std::fmt::Debug,
122125
T::ConnectivityHandle: ConnectivityService<T>,
123126
<T as net::NetworkingService>::Address: std::str::FromStr,
124127
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
@@ -127,14 +130,7 @@ where
127130
let mut swarm1 = make_peer_manager::<T>(addr1, Arc::clone(&config)).await;
128131
let mut swarm2 = make_peer_manager::<T>(addr2, config).await;
129132

130-
let addr = swarm2.handle.local_addr().await.unwrap().unwrap();
131-
let (_conn1_res, _conn2_res) =
132-
tokio::join!(swarm1.handle.connect(addr), swarm2.handle.poll_next());
133-
134-
assert!(std::matches!(
135-
swarm1.handle.poll_next().await,
136-
Ok(net::types::ConnectivityEvent::OutboundAccepted { .. })
137-
));
133+
connect_services::<T>(&mut swarm1.handle, &mut swarm2.handle).await;
138134
}
139135

140136
#[tokio::test]
@@ -196,7 +192,7 @@ async fn test_validate_supported_protocols() {
196192

197193
async fn connect_outbound_different_network<T>(addr1: T::Address, addr2: T::Address)
198194
where
199-
T: NetworkingService + 'static,
195+
T: NetworkingService + 'static + std::fmt::Debug,
200196
T::ConnectivityHandle: ConnectivityService<T>,
201197
<T as net::NetworkingService>::Address: std::str::FromStr,
202198
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
@@ -209,17 +205,8 @@ where
209205
)
210206
.await;
211207

212-
let addr = swarm2.handle.local_addr().await.unwrap().unwrap();
213-
tokio::spawn(async move { swarm2.handle.poll_next().await.unwrap() });
214-
swarm1.handle.connect(addr).await.unwrap();
215-
216-
if let Ok(net::types::ConnectivityEvent::OutboundAccepted {
217-
peer_info,
218-
address: _,
219-
}) = swarm1.handle.poll_next().await
220-
{
221-
assert_ne!(peer_info.magic_bytes, *config.magic_bytes());
222-
}
208+
let (_address, peer_info) = connect_services::<T>(&mut swarm2.handle, &mut swarm1.handle).await;
209+
assert_ne!(peer_info.magic_bytes, *config.magic_bytes());
223210
}
224211

225212
#[tokio::test]
@@ -235,7 +222,7 @@ async fn connect_outbound_different_network_mock() {
235222

236223
async fn connect_inbound_same_network<T>(addr1: T::Address, addr2: T::Address)
237224
where
238-
T: NetworkingService + 'static,
225+
T: NetworkingService + 'static + std::fmt::Debug,
239226
T::ConnectivityHandle: ConnectivityService<T>,
240227
<T as net::NetworkingService>::Address: std::str::FromStr,
241228
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
@@ -244,19 +231,11 @@ where
244231
let mut swarm1 = make_peer_manager::<T>(addr1, Arc::clone(&config)).await;
245232
let mut swarm2 = make_peer_manager::<T>(addr2, config).await;
246233

247-
let (_conn1_res, conn2_res) = tokio::join!(
248-
swarm1.handle.connect(swarm2.handle.local_addr().await.unwrap().unwrap()),
249-
swarm2.handle.poll_next()
234+
let (address, peer_info) = connect_services::<T>(&mut swarm1.handle, &mut swarm2.handle).await;
235+
assert_eq!(
236+
swarm2.accept_inbound_connection(address, peer_info).await,
237+
Ok(())
250238
);
251-
let conn2_res: net::types::ConnectivityEvent<T> = conn2_res.unwrap();
252-
if let net::types::ConnectivityEvent::InboundAccepted { peer_info, address } = conn2_res {
253-
assert_eq!(
254-
swarm2.accept_inbound_connection(address, peer_info).await,
255-
Ok(())
256-
);
257-
} else {
258-
panic!("invalid event received");
259-
}
260239
}
261240

262241
#[tokio::test]
@@ -272,7 +251,7 @@ async fn connect_inbound_same_network_mock() {
272251

273252
async fn connect_inbound_different_network<T>(addr1: T::Address, addr2: T::Address)
274253
where
275-
T: NetworkingService + 'static,
254+
T: NetworkingService + 'static + std::fmt::Debug,
276255
T::ConnectivityHandle: ConnectivityService<T>,
277256
<T as net::NetworkingService>::Address: std::str::FromStr,
278257
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
@@ -284,23 +263,15 @@ where
284263
)
285264
.await;
286265

287-
let (_conn1_res, conn2_res) = tokio::join!(
288-
swarm1.handle.connect(swarm2.handle.local_addr().await.unwrap().unwrap()),
289-
swarm2.handle.poll_next()
266+
let (address, peer_info) = connect_services::<T>(&mut swarm1.handle, &mut swarm2.handle).await;
267+
268+
assert_eq!(
269+
swarm2.accept_inbound_connection(address, peer_info).await,
270+
Err(P2pError::ProtocolError(ProtocolError::DifferentNetwork(
271+
[1, 2, 3, 4],
272+
*config::create_mainnet().magic_bytes(),
273+
)))
290274
);
291-
let conn2_res: net::types::ConnectivityEvent<T> = conn2_res.unwrap();
292-
293-
if let net::types::ConnectivityEvent::InboundAccepted { peer_info, address } = conn2_res {
294-
assert_eq!(
295-
swarm2.accept_inbound_connection(address, peer_info).await,
296-
Err(P2pError::ProtocolError(ProtocolError::DifferentNetwork(
297-
[1, 2, 3, 4],
298-
*config::create_mainnet().magic_bytes(),
299-
)))
300-
);
301-
} else {
302-
panic!("invalid event received");
303-
}
304275
}
305276

306277
#[tokio::test]
@@ -316,28 +287,16 @@ async fn connect_inbound_different_network_mock() {
316287

317288
async fn remote_closes_connection<T>(addr1: T::Address, addr2: T::Address)
318289
where
319-
T: NetworkingService + 'static,
290+
T: NetworkingService + 'static + std::fmt::Debug,
320291
T::ConnectivityHandle: ConnectivityService<T>,
321292
<T as net::NetworkingService>::Address: std::str::FromStr,
322293
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
323294
{
324295
let mut swarm1 = make_peer_manager::<T>(addr1, Arc::new(config::create_mainnet())).await;
325296
let mut swarm2 = make_peer_manager::<T>(addr2, Arc::new(config::create_mainnet())).await;
326297

327-
let (_conn1_res, conn2_res) = tokio::join!(
328-
swarm1.handle.connect(swarm2.handle.local_addr().await.unwrap().unwrap()),
329-
swarm2.handle.poll_next()
330-
);
331-
let conn2_res: net::types::ConnectivityEvent<T> = conn2_res.unwrap();
332-
333-
assert!(std::matches!(
334-
conn2_res,
335-
net::types::ConnectivityEvent::InboundAccepted { .. }
336-
));
337-
assert!(std::matches!(
338-
swarm1.handle.poll_next().await,
339-
Ok(net::types::ConnectivityEvent::OutboundAccepted { .. })
340-
));
298+
let (_address, _peer_info) =
299+
connect_services::<T>(&mut swarm1.handle, &mut swarm2.handle).await;
341300

342301
assert_eq!(
343302
swarm2.handle.disconnect(*swarm1.handle.peer_id()).await,
@@ -366,7 +325,7 @@ async fn inbound_connection_too_many_peers<T>(
366325
default_addr: T::Address,
367326
peers: Vec<net::types::PeerInfo<T>>,
368327
) where
369-
T: NetworkingService + 'static,
328+
T: NetworkingService + 'static + std::fmt::Debug,
370329
T::ConnectivityHandle: ConnectivityService<T>,
371330
<T as net::NetworkingService>::Address: std::str::FromStr,
372331
<<T as net::NetworkingService>::Address as std::str::FromStr>::Err: std::fmt::Debug,
@@ -383,10 +342,8 @@ async fn inbound_connection_too_many_peers<T>(
383342
swarm::MAX_ACTIVE_CONNECTIONS
384343
);
385344

386-
let addr = swarm2.handle.local_addr().await.unwrap().unwrap();
387-
let (_conn1_res, conn2_res) =
388-
tokio::join!(swarm1.handle.connect(addr), swarm2.handle.poll_next());
389-
let _conn2_res: net::types::ConnectivityEvent<T> = conn2_res.unwrap();
345+
let (_address, _peer_info) =
346+
connect_services::<T>(&mut swarm1.handle, &mut swarm2.handle).await;
390347
let swarm1_id = *swarm1.handle.peer_id();
391348

392349
// run the first peer manager in the background and poll events from the peer manager

0 commit comments

Comments
 (0)