Skip to content

Commit 0bf9dbf

Browse files
authored
Merge pull request #356 from mintlayer/p2p-unbounded-channels
p2p: Start using `tokio::unbounded_channel`
2 parents 283b215 + 9d32873 commit 0bf9dbf

File tree

16 files changed

+160
-207
lines changed

16 files changed

+160
-207
lines changed

p2p/src/constants.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,3 @@ pub const PING_MAX_RETRIES: u32 = 3;
2727

2828
/// Maximum message size
2929
pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
30-
31-
// TODO: think about channel sizes
32-
pub const CHANNEL_SIZE: usize = 64;

p2p/src/lib.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ pub mod sync;
3838
/// Result type with P2P errors
3939
pub type Result<T> = core::result::Result<T, P2pError>;
4040

41-
// TODO: figure out proper channel sizes
42-
const CHANNEL_SIZE: usize = 64;
43-
4441
pub struct P2pInterface<T: NetworkingService> {
4542
p2p: P2P<T>,
4643
}
@@ -65,7 +62,6 @@ where
6562
})?,
6663
tx,
6764
))
68-
.await
6965
.map_err(|_| P2pError::ChannelClosed)?;
7066
rx.await.map_err(P2pError::from)?
7167
}
@@ -83,7 +79,6 @@ where
8379
self.p2p
8480
.tx_swarm
8581
.send(event::SwarmEvent::Disconnect(peer_id, tx))
86-
.await
8782
.map_err(|_| P2pError::ChannelClosed)?;
8883
rx.await.map_err(P2pError::from)?
8984
}
@@ -93,7 +88,6 @@ where
9388
self.p2p
9489
.tx_swarm
9590
.send(event::SwarmEvent::GetPeerCount(tx))
96-
.await
9791
.map_err(P2pError::from)?;
9892
rx.await.map_err(P2pError::from)
9993
}
@@ -103,7 +97,6 @@ where
10397
self.p2p
10498
.tx_swarm
10599
.send(event::SwarmEvent::GetBindAddress(tx))
106-
.await
107100
.map_err(P2pError::from)?;
108101
rx.await.map_err(P2pError::from)
109102
}
@@ -113,7 +106,6 @@ where
113106
self.p2p
114107
.tx_swarm
115108
.send(event::SwarmEvent::GetPeerId(tx))
116-
.await
117109
.map_err(P2pError::from)?;
118110
rx.await.map_err(P2pError::from)
119111
}
@@ -123,7 +115,6 @@ where
123115
self.p2p
124116
.tx_swarm
125117
.send(event::SwarmEvent::GetConnectedPeers(tx))
126-
.await
127118
.map_err(P2pError::from)?;
128119
rx.await.map_err(P2pError::from)
129120
}
@@ -132,10 +123,10 @@ where
132123
struct P2P<T: NetworkingService> {
133124
// TODO: add abstration for channels
134125
/// TX channel for sending swarm control events
135-
pub tx_swarm: mpsc::Sender<event::SwarmEvent<T>>,
126+
pub tx_swarm: mpsc::UnboundedSender<event::SwarmEvent<T>>,
136127

137128
/// TX channel for sending syncing/pubsub events
138-
pub _tx_sync: mpsc::Sender<event::SyncEvent>,
129+
pub _tx_sync: mpsc::UnboundedSender<event::SyncEvent>,
139130
}
140131

141132
impl<T> P2P<T>
@@ -177,10 +168,10 @@ where
177168
//
178169
// The difference between these types is that enums that contain the events *can* have
179170
// a `oneshot::channel` object that must be used to send the response.
180-
let (tx_swarm, rx_swarm) = mpsc::channel(CHANNEL_SIZE);
181-
let (tx_p2p_sync, rx_p2p_sync) = mpsc::channel(CHANNEL_SIZE);
182-
let (_tx_sync, _rx_sync) = mpsc::channel(CHANNEL_SIZE);
183-
let (tx_pubsub, rx_pubsub) = mpsc::channel(CHANNEL_SIZE);
171+
let (tx_swarm, rx_swarm) = mpsc::unbounded_channel();
172+
let (tx_p2p_sync, rx_p2p_sync) = mpsc::unbounded_channel();
173+
let (_tx_sync, _rx_sync) = mpsc::unbounded_channel();
174+
let (tx_pubsub, rx_pubsub) = mpsc::unbounded_channel();
184175

185176
{
186177
let chain_config = Arc::clone(&chain_config);

p2p/src/net/libp2p/backend.rs

Lines changed: 35 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ pub struct Libp2pBackend {
4242
pub(super) swarm: Swarm<behaviour::Libp2pBehaviour>,
4343

4444
/// Receiver for incoming commands
45-
cmd_rx: mpsc::Receiver<types::Command>,
45+
cmd_rx: mpsc::UnboundedReceiver<types::Command>,
4646

4747
/// Sender for outgoing connectivity events
48-
pub(super) conn_tx: mpsc::Sender<types::ConnectivityEvent>,
48+
pub(super) conn_tx: mpsc::UnboundedSender<types::ConnectivityEvent>,
4949

5050
/// Sender for outgoing gossipsub events
51-
pub(super) gossip_tx: mpsc::Sender<types::PubSubEvent>,
51+
pub(super) gossip_tx: mpsc::UnboundedSender<types::PubSubEvent>,
5252

5353
/// Sender for outgoing syncing events
54-
pub(super) sync_tx: mpsc::Sender<types::SyncingEvent>,
54+
pub(super) sync_tx: mpsc::UnboundedSender<types::SyncingEvent>,
5555

5656
/// Active listen address of the backend
5757
// TODO: cache this inside `Libp2pConnectivityHandle`?
@@ -61,10 +61,10 @@ pub struct Libp2pBackend {
6161
impl Libp2pBackend {
6262
pub fn new(
6363
swarm: Swarm<behaviour::Libp2pBehaviour>,
64-
cmd_rx: mpsc::Receiver<types::Command>,
65-
conn_tx: mpsc::Sender<types::ConnectivityEvent>,
66-
gossip_tx: mpsc::Sender<types::PubSubEvent>,
67-
sync_tx: mpsc::Sender<types::SyncingEvent>,
64+
cmd_rx: mpsc::UnboundedReceiver<types::Command>,
65+
conn_tx: mpsc::UnboundedSender<types::ConnectivityEvent>,
66+
gossip_tx: mpsc::UnboundedSender<types::PubSubEvent>,
67+
sync_tx: mpsc::UnboundedSender<types::SyncingEvent>,
6868
) -> Self {
6969
Self {
7070
swarm,
@@ -90,13 +90,13 @@ impl Libp2pBackend {
9090
self.swarm.behaviour_mut().connmgr.handle_banned_peer(peer_id);
9191
}
9292
SwarmEvent::Behaviour(Libp2pBehaviourEvent::Connectivity(event)) => {
93-
self.conn_tx.send(event).await.map_err(P2pError::from)?;
93+
self.conn_tx.send(event).map_err(P2pError::from)?;
9494
}
9595
SwarmEvent::Behaviour(Libp2pBehaviourEvent::Syncing(event)) => {
96-
self.sync_tx.send(event).await.map_err(P2pError::from)?;
96+
self.sync_tx.send(event).map_err(P2pError::from)?;
9797
}
9898
SwarmEvent::Behaviour(Libp2pBehaviourEvent::PubSub(event)) => {
99-
self.gossip_tx.send(event).await.map_err(P2pError::from)?;
99+
self.gossip_tx.send(event).map_err(P2pError::from)?;
100100
}
101101
SwarmEvent::Behaviour(Libp2pBehaviourEvent::Control(
102102
ControlEvent::CloseConnection { peer_id })
@@ -408,21 +408,19 @@ mod tests {
408408
#[tokio::test]
409409
async fn test_command_listen_success() {
410410
let swarm = make_swarm().await;
411-
let (cmd_tx, cmd_rx) = mpsc::channel(16);
412-
let (gossip_tx, _) = mpsc::channel(64);
413-
let (conn_tx, _) = mpsc::channel(64);
414-
let (sync_tx, _) = mpsc::channel(64);
411+
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
412+
let (gossip_tx, _) = mpsc::unbounded_channel();
413+
let (conn_tx, _) = mpsc::unbounded_channel();
414+
let (sync_tx, _) = mpsc::unbounded_channel();
415415
let mut backend = Libp2pBackend::new(swarm, cmd_rx, conn_tx, gossip_tx, sync_tx);
416416

417417
tokio::spawn(async move { backend.run().await });
418418

419419
let (tx, rx) = oneshot::channel();
420-
let res = cmd_tx
421-
.send(types::Command::Listen {
422-
addr: p2p_test_utils::make_libp2p_addr(),
423-
response: tx,
424-
})
425-
.await;
420+
let res = cmd_tx.send(types::Command::Listen {
421+
addr: p2p_test_utils::make_libp2p_addr(),
422+
response: tx,
423+
});
426424
assert!(res.is_ok());
427425

428426
let res = rx.await;
@@ -435,21 +433,19 @@ mod tests {
435433
#[tokio::test]
436434
async fn test_command_listen_addrinuse() {
437435
let swarm = make_swarm().await;
438-
let (cmd_tx, cmd_rx) = mpsc::channel(16);
439-
let (gossip_tx, _) = mpsc::channel(64);
440-
let (conn_tx, _) = mpsc::channel(64);
441-
let (sync_tx, _) = mpsc::channel(64);
436+
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
437+
let (gossip_tx, _) = mpsc::unbounded_channel();
438+
let (conn_tx, _) = mpsc::unbounded_channel();
439+
let (sync_tx, _) = mpsc::unbounded_channel();
442440
let mut backend = Libp2pBackend::new(swarm, cmd_rx, conn_tx, gossip_tx, sync_tx);
443441

444442
tokio::spawn(async move { backend.run().await });
445443

446444
let (tx, rx) = oneshot::channel();
447-
let res = cmd_tx
448-
.send(types::Command::Listen {
449-
addr: p2p_test_utils::make_libp2p_addr(),
450-
response: tx,
451-
})
452-
.await;
445+
let res = cmd_tx.send(types::Command::Listen {
446+
addr: p2p_test_utils::make_libp2p_addr(),
447+
response: tx,
448+
});
453449
assert!(res.is_ok());
454450

455451
let res = rx.await;
@@ -458,12 +454,10 @@ mod tests {
458454

459455
// try to bind to the same interface again
460456
let (tx, rx) = oneshot::channel();
461-
let res = cmd_tx
462-
.send(types::Command::Listen {
463-
addr: p2p_test_utils::make_libp2p_addr(),
464-
response: tx,
465-
})
466-
.await;
457+
let res = cmd_tx.send(types::Command::Listen {
458+
addr: p2p_test_utils::make_libp2p_addr(),
459+
response: tx,
460+
});
467461
assert!(res.is_ok());
468462

469463
let res = rx.await;
@@ -476,10 +470,10 @@ mod tests {
476470
#[tokio::test]
477471
async fn test_drop_command_tx() {
478472
let swarm = make_swarm().await;
479-
let (cmd_tx, cmd_rx) = mpsc::channel(16);
480-
let (gossip_tx, _) = mpsc::channel(64);
481-
let (conn_tx, _) = mpsc::channel(64);
482-
let (sync_tx, _) = mpsc::channel(64);
473+
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
474+
let (gossip_tx, _) = mpsc::unbounded_channel();
475+
let (conn_tx, _) = mpsc::unbounded_channel();
476+
let (sync_tx, _) = mpsc::unbounded_channel();
483477
let mut backend = Libp2pBackend::new(swarm, cmd_rx, conn_tx, gossip_tx, sync_tx);
484478

485479
drop(cmd_tx);

p2p/src/net/libp2p/constants.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,3 @@ pub const PING_MAX_RETRIES: u32 = 3;
3333
/// Request-response configuration
3434
pub const REQ_RESP_TIMEOUT: Duration = Duration::from_secs(10);
3535
pub const MESSAGE_MAX_SIZE: usize = 10 * 1024 * 1024;
36-
// TODO: think about channel sizes
37-
pub const CHANNEL_SIZE: usize = 64;

p2p/src/net/libp2p/mod.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,10 @@ impl NetworkingService for Libp2pService {
103103
)
104104
.build();
105105

106-
// TODO: unbounded
107-
let (cmd_tx, cmd_rx) = mpsc::channel(constants::CHANNEL_SIZE);
108-
let (gossip_tx, gossip_rx) = mpsc::channel(constants::CHANNEL_SIZE);
109-
let (conn_tx, conn_rx) = mpsc::channel(constants::CHANNEL_SIZE);
110-
let (sync_tx, sync_rx) = mpsc::channel(constants::CHANNEL_SIZE);
106+
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
107+
let (gossip_tx, gossip_rx) = mpsc::unbounded_channel();
108+
let (conn_tx, conn_rx) = mpsc::unbounded_channel();
109+
let (sync_tx, sync_rx) = mpsc::unbounded_channel();
111110

112111
// run the libp2p backend in a background task
113112
tokio::spawn(async move {
@@ -119,12 +118,10 @@ impl NetworkingService for Libp2pService {
119118
// send listen command to the libp2p backend and if it succeeds,
120119
// create a multiaddress for local peer and return the Libp2pService object
121120
let (tx, rx) = oneshot::channel();
122-
cmd_tx
123-
.send(types::Command::Listen {
124-
addr: bind_addr.clone(),
125-
response: tx,
126-
})
127-
.await?;
121+
cmd_tx.send(types::Command::Listen {
122+
addr: bind_addr.clone(),
123+
response: tx,
124+
})?;
128125
rx.await?
129126
.map_err(|_| P2pError::DialError(DialError::IoError(std::io::ErrorKind::AddrInUse)))?;
130127

p2p/src/net/libp2p/service/connectivity.rs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,18 @@ pub struct Libp2pConnectivityHandle<T: NetworkingService> {
3131
peer_id: PeerId,
3232

3333
/// Channel for sending commands to libp2p backend
34-
cmd_tx: mpsc::Sender<types::Command>,
34+
cmd_tx: mpsc::UnboundedSender<types::Command>,
3535

3636
/// Channel for receiving connectivity events from libp2p backend
37-
conn_rx: mpsc::Receiver<types::ConnectivityEvent>,
37+
conn_rx: mpsc::UnboundedReceiver<types::ConnectivityEvent>,
3838
_marker: std::marker::PhantomData<fn() -> T>,
3939
}
4040

4141
impl<T: NetworkingService> Libp2pConnectivityHandle<T> {
4242
pub fn new(
4343
peer_id: PeerId,
44-
cmd_tx: mpsc::Sender<types::Command>,
45-
conn_rx: mpsc::Receiver<types::ConnectivityEvent>,
44+
cmd_tx: mpsc::UnboundedSender<types::Command>,
45+
conn_rx: mpsc::UnboundedReceiver<types::ConnectivityEvent>,
4646
) -> Self {
4747
Self {
4848
peer_id,
@@ -177,13 +177,11 @@ where
177177

178178
// try to connect to remote peer
179179
let (tx, rx) = oneshot::channel();
180-
self.cmd_tx
181-
.send(types::Command::Connect {
182-
peer_id,
183-
peer_addr: addr.clone(),
184-
response: tx,
185-
})
186-
.await?;
180+
self.cmd_tx.send(types::Command::Connect {
181+
peer_id,
182+
peer_addr: addr.clone(),
183+
response: tx,
184+
})?;
187185

188186
rx.await.map_err(P2pError::from)?.map_err(P2pError::from)
189187
}
@@ -192,18 +190,16 @@ where
192190
log::debug!("disconnect peer {:?}", peer_id);
193191

194192
let (tx, rx) = oneshot::channel();
195-
self.cmd_tx
196-
.send(types::Command::Disconnect {
197-
peer_id,
198-
response: tx,
199-
})
200-
.await?;
193+
self.cmd_tx.send(types::Command::Disconnect {
194+
peer_id,
195+
response: tx,
196+
})?;
201197
rx.await.map_err(P2pError::from)?.map_err(P2pError::from)
202198
}
203199

204200
async fn local_addr(&self) -> crate::Result<Option<T::Address>> {
205201
let (tx, rx) = oneshot::channel();
206-
self.cmd_tx.send(types::Command::ListenAddress { response: tx }).await?;
202+
self.cmd_tx.send(types::Command::ListenAddress { response: tx })?;
207203
rx.await.map_err(P2pError::from)
208204
}
209205

@@ -215,12 +211,10 @@ where
215211
log::debug!("ban peer {}", peer_id);
216212

217213
let (tx, rx) = oneshot::channel();
218-
self.cmd_tx
219-
.send(types::Command::BanPeer {
220-
peer_id,
221-
response: tx,
222-
})
223-
.await?;
214+
self.cmd_tx.send(types::Command::BanPeer {
215+
peer_id,
216+
response: tx,
217+
})?;
224218
rx.await.map_err(P2pError::from)?.map_err(P2pError::from)
225219
}
226220

0 commit comments

Comments
 (0)