Skip to content

Commit a691e1b

Browse files
Merge pull request #308 from mintlayer/fix/p2p_suggestions
Fix: p2p suggestions
2 parents 71c0929 + b205f28 commit a691e1b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1689
-1343
lines changed

common/src/lib.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,3 @@ mod fixed_hash;
2525

2626
pub use concurrency_impl::*;
2727
pub use uint::{Uint128, Uint256};
28-
29-
#[cfg(test)]
30-
mod tests {
31-
#[test]
32-
#[allow(clippy::eq_op)]
33-
fn it_works() {
34-
assert_eq!(2 + 2, 4);
35-
}
36-
}

p2p/src/error.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use libp2p::{
2323
};
2424
use thiserror::Error;
2525

26+
/// Errors related to invalid data/peer information that results in connection getting closed
27+
/// and the peer getting banned.
2628
#[derive(Error, Debug, PartialEq, Eq)]
2729
pub enum ProtocolError {
2830
#[error("Peer is in different network. Our network {0:?}, their network {1:?}")]
@@ -31,16 +33,17 @@ pub enum ProtocolError {
3133
InvalidVersion(SemVer, SemVer),
3234
#[error("Peer sent an invalid message")]
3335
InvalidMessage,
34-
#[error("Peer is incompatible")] // TODO: remove?
36+
#[error("Peer is incompatible")]
3537
Incompatible,
3638
#[error("Peer is unresponsive")]
3739
Unresponsive,
38-
#[error("Peer uses an invalid protocol")] // TODO: remove?
40+
#[error("Peer uses an invalid protocol")]
3941
InvalidProtocol,
4042
#[error("Peer state is invalid for this operation. State is {0} but should be {1}")]
4143
InvalidState(&'static str, &'static str),
4244
}
4345

46+
/// Peer state errors (Errors either for an individual peer or for the [`PeerManager`])
4447
#[derive(Error, Debug, PartialEq, Eq)]
4548
pub enum PeerError {
4649
#[error("Peer disconnected")]
@@ -61,6 +64,7 @@ pub enum PeerError {
6164
Pending(String),
6265
}
6366

67+
/// PubSub errors for announcements
6468
#[derive(Error, Debug, PartialEq, Eq)]
6569
pub enum PublishError {
6670
#[error("Message has already been published")]
@@ -75,6 +79,7 @@ pub enum PublishError {
7579
TransformFailed,
7680
}
7781

82+
/// PubSub errors for subscriptions
7883
#[derive(Error, Debug, PartialEq, Eq)]
7984
pub enum SubscriptionError {
8085
#[error("Failed to publish subscription: {0}")]
@@ -83,6 +88,7 @@ pub enum SubscriptionError {
8388
NotAllowed,
8489
}
8590

91+
/// Errors related to establishing a connection with a remote peer
8692
#[derive(Error, Debug, PartialEq, Eq)]
8793
pub enum DialError {
8894
#[error("Peer is banned")]
@@ -107,6 +113,7 @@ pub enum DialError {
107113
Transport,
108114
}
109115

116+
/// Low-level connection errors caused by libp2p
110117
#[derive(Error, Debug, PartialEq, Eq)]
111118
pub enum ConnectionError {
112119
#[error("Timeout")]
@@ -117,6 +124,7 @@ pub enum ConnectionError {
117124
Upgrade,
118125
}
119126

127+
/// Conversion errors
120128
#[derive(Error, Debug, PartialEq, Eq)]
121129
pub enum ConversionError {
122130
#[error("Invalid peer ID: `{0}`")]

p2p/src/event.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ pub enum SyncEvent {
4848
}
4949

5050
#[derive(Debug)]
51-
pub enum SyncControlEvent<T>
52-
where
53-
T: NetworkingService,
54-
{
51+
pub enum SyncControlEvent<T: NetworkingService> {
5552
/// Peer connected
5653
Connected(T::PeerId),
5754

p2p/src/lib.rs

Lines changed: 68 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use crate::{
1717
config::P2pConfig,
1818
error::{ConversionError, P2pError},
19-
net::{ConnectivityService, NetworkingService, PubSubService, SyncingCodecService},
19+
net::{ConnectivityService, NetworkingService, PubSubService, SyncingMessagingService},
2020
};
2121
use chainstate::chainstate_interface;
2222
use common::chain::ChainConfig;
@@ -30,9 +30,9 @@ pub mod error;
3030
pub mod event;
3131
pub mod message;
3232
pub mod net;
33+
pub mod peer_manager;
3334
pub mod pubsub;
3435
pub mod rpc;
35-
pub mod swarm;
3636
pub mod sync;
3737

3838
/// Result type with P2P errors
@@ -45,7 +45,6 @@ pub struct P2pInterface<T: NetworkingService> {
4545
p2p: P2P<T>,
4646
}
4747

48-
// TODO: reduce code duplication?
4948
impl<T> P2pInterface<T>
5049
where
5150
T: NetworkingService,
@@ -143,7 +142,7 @@ impl<T> P2P<T>
143142
where
144143
T: 'static + NetworkingService,
145144
T::ConnectivityHandle: ConnectivityService<T>,
146-
T::SyncingCodecHandle: SyncingCodecService<T>,
145+
T::SyncingMessagingHandle: SyncingMessagingService<T>,
147146
T::PubSubHandle: PubSubService<T>,
148147
{
149148
/// Start the P2P subsystem
@@ -170,63 +169,77 @@ where
170169
)
171170
.await?;
172171

173-
// TODO: think about these channel sizes
172+
// P2P creates its components (such as PeerManager, sync, pubsub, etc) and makes
173+
// communications with them in two possible ways:
174+
//
175+
// 1. Fire-and-forget
176+
// 2. Request and wait for response
177+
//
178+
// The difference between these types is that enums that contain the events *can* have
179+
// a `oneshot::channel` object that must be used to send the response.
174180
let (tx_swarm, rx_swarm) = mpsc::channel(CHANNEL_SIZE);
175181
let (tx_p2p_sync, rx_p2p_sync) = mpsc::channel(CHANNEL_SIZE);
176182
let (_tx_sync, _rx_sync) = mpsc::channel(CHANNEL_SIZE);
177183
let (tx_pubsub, rx_pubsub) = mpsc::channel(CHANNEL_SIZE);
178184

179-
let swarm_config = Arc::clone(&chain_config);
180-
tokio::spawn(async move {
181-
if let Err(e) = swarm::PeerManager::<T>::new(
182-
swarm_config,
183-
Arc::clone(&p2p_config),
184-
conn,
185-
rx_swarm,
186-
tx_p2p_sync,
187-
)
188-
.run()
189-
.await
190-
{
191-
log::error!("PeerManager failed: {:?}", e);
192-
}
193-
});
185+
{
186+
let chain_config = Arc::clone(&chain_config);
187+
tokio::spawn(async move {
188+
if let Err(err) = peer_manager::PeerManager::<T>::new(
189+
chain_config,
190+
Arc::clone(&p2p_config),
191+
conn,
192+
rx_swarm,
193+
tx_p2p_sync,
194+
)
195+
.run()
196+
.await
197+
{
198+
log::error!("PeerManager failed: {err}");
199+
}
200+
});
201+
}
202+
{
203+
let consensus_handle = consensus_handle.clone();
204+
let tx_swarm = tx_swarm.clone();
205+
let chain_config = Arc::clone(&chain_config);
194206

195-
let sync_handle = consensus_handle.clone();
196-
let tx_swarm_sync = tx_swarm.clone();
197-
let sync_config = Arc::clone(&chain_config);
198-
tokio::spawn(async move {
199-
if let Err(e) = sync::SyncManager::<T>::new(
200-
sync_config,
201-
sync,
202-
sync_handle,
203-
rx_p2p_sync,
204-
tx_swarm_sync,
205-
tx_pubsub,
206-
)
207-
.run()
208-
.await
209-
{
210-
log::error!("SyncManager failed: {:?}", e);
211-
}
212-
});
207+
tokio::spawn(async move {
208+
if let Err(err) = sync::BlockSyncManager::<T>::new(
209+
chain_config,
210+
sync,
211+
consensus_handle,
212+
rx_p2p_sync,
213+
tx_swarm,
214+
tx_pubsub,
215+
)
216+
.run()
217+
.await
218+
{
219+
log::error!("SyncManager failed: {err}");
220+
}
221+
});
222+
}
213223

214-
let tx_swarm_pubsub = tx_swarm.clone();
215-
tokio::spawn(async move {
216-
if let Err(e) = pubsub::PubSubMessageHandler::<T>::new(
217-
chain_config,
218-
pubsub,
219-
consensus_handle,
220-
tx_swarm_pubsub,
221-
rx_pubsub,
222-
&[net::types::PubSubTopic::Blocks],
223-
)
224-
.run()
225-
.await
226-
{
227-
log::error!("PubSubMessageHandler failed: {:?}", e);
228-
}
229-
});
224+
{
225+
let tx_swarm = tx_swarm.clone();
226+
227+
tokio::spawn(async move {
228+
if let Err(err) = pubsub::PubSubMessageHandler::<T>::new(
229+
chain_config,
230+
pubsub,
231+
consensus_handle,
232+
tx_swarm,
233+
rx_pubsub,
234+
&[net::types::PubSubTopic::Blocks],
235+
)
236+
.run()
237+
.await
238+
{
239+
log::error!("PubSubMessageHandler failed: {err}");
240+
}
241+
});
242+
}
230243

231244
Ok(Self { tx_swarm, _tx_sync })
232245
}
@@ -244,7 +257,7 @@ pub async fn make_p2p<T>(
244257
where
245258
T: NetworkingService + 'static,
246259
T::ConnectivityHandle: ConnectivityService<T>,
247-
T::SyncingCodecHandle: SyncingCodecService<T>,
260+
T::SyncingMessagingHandle: SyncingMessagingService<T>,
248261
T::PubSubHandle: PubSubService<T>,
249262
<T as NetworkingService>::Address: FromStr,
250263
<<T as NetworkingService>::Address as FromStr>::Err: Debug,

p2p/src/message.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ use common::{
2121
use serialization::{Decode, Encode};
2222

2323
#[derive(Debug, Encode, Decode, Clone, PartialEq, Eq)]
24-
pub struct HeaderRequest {
24+
pub struct HeaderListRequest {
2525
locator: Locator,
2626
}
2727

28-
impl HeaderRequest {
28+
impl HeaderListRequest {
2929
pub fn new(locator: Locator) -> Self {
30-
HeaderRequest { locator }
30+
HeaderListRequest { locator }
3131
}
3232

3333
pub fn locator(&self) -> &Locator {
@@ -40,11 +40,11 @@ impl HeaderRequest {
4040
}
4141

4242
#[derive(Debug, Encode, Decode, Clone, PartialEq, Eq)]
43-
pub struct BlockRequest {
43+
pub struct BlockListRequest {
4444
block_ids: Vec<Id<Block>>,
4545
}
4646

47-
impl BlockRequest {
47+
impl BlockListRequest {
4848
pub fn new(block_ids: Vec<Id<Block>>) -> Self {
4949
Self { block_ids }
5050
}
@@ -61,17 +61,17 @@ impl BlockRequest {
6161
#[derive(Debug, Encode, Decode, Clone, PartialEq, Eq)]
6262
pub enum Request {
6363
#[codec(index = 0)]
64-
HeaderRequest(HeaderRequest),
64+
HeaderListRequest(HeaderListRequest),
6565
#[codec(index = 1)]
66-
BlockRequest(BlockRequest),
66+
BlockListRequest(BlockListRequest),
6767
}
6868

6969
#[derive(Debug, Encode, Decode, Clone, PartialEq, Eq)]
70-
pub struct HeaderResponse {
70+
pub struct HeaderListResponse {
7171
headers: Vec<BlockHeader>,
7272
}
7373

74-
impl HeaderResponse {
74+
impl HeaderListResponse {
7575
pub fn new(headers: Vec<BlockHeader>) -> Self {
7676
Self { headers }
7777
}
@@ -86,11 +86,11 @@ impl HeaderResponse {
8686
}
8787

8888
#[derive(Debug, Encode, Decode, Clone, PartialEq, Eq)]
89-
pub struct BlockResponse {
89+
pub struct BlockListResponse {
9090
blocks: Vec<Block>,
9191
}
9292

93-
impl BlockResponse {
93+
impl BlockListResponse {
9494
pub fn new(blocks: Vec<Block>) -> Self {
9595
Self { blocks }
9696
}
@@ -107,9 +107,9 @@ impl BlockResponse {
107107
#[derive(Debug, Encode, Decode, Clone, PartialEq, Eq)]
108108
pub enum Response {
109109
#[codec(index = 0)]
110-
HeaderResponse(HeaderResponse),
110+
HeaderListResponse(HeaderListResponse),
111111
#[codec(index = 1)]
112-
BlockResponse(BlockResponse),
112+
BlockListResponse(BlockListResponse),
113113
}
114114

115115
#[derive(Debug, Encode, Decode, Clone, PartialEq, Eq)]

0 commit comments

Comments
 (0)