Skip to content

Commit 1306b2d

Browse files
libp2p upgrade + gossipsub interval fix (#3012)
## Issue Addressed Lighthouse gossiping late messages ## Proposed Changes Point LH to our fork using tokio interval, which 1) works as expected 2) is more performant than the previous version that actually worked as expected Upgrade libp2p ## Additional Info libp2p/rust-libp2p#2497
1 parent 7e38d20 commit 1306b2d

File tree

13 files changed

+340
-256
lines changed

13 files changed

+340
-256
lines changed

Cargo.lock

Lines changed: 217 additions & 164 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/client/src/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use eth2::{
1818
};
1919
use execution_layer::ExecutionLayer;
2020
use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH};
21-
use lighthouse_network::{open_metrics_client::registry::Registry, NetworkGlobals};
21+
use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals};
2222
use monitoring_api::{MonitoringHttpClient, ProcessType};
2323
use network::{NetworkConfig, NetworkMessage, NetworkService};
2424
use slasher::Slasher;

beacon_node/http_api/tests/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ pub async fn create_api_server<T: BeaconChainTypes>(
120120
send_back_addr: EXTERNAL_ADDR.parse().unwrap(),
121121
};
122122
let con_id = ConnectionId::new(1);
123-
pm.inject_connection_established(&peer_id, &con_id, &connected_point, None);
123+
pm.inject_connection_established(&peer_id, &con_id, &connected_point, None, 0);
124124
*network_globals.sync_state.write() = SyncState::Synced;
125125

126126
let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone());

beacon_node/http_metrics/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
mod metrics;
55

66
use beacon_chain::{BeaconChain, BeaconChainTypes};
7-
use lighthouse_network::open_metrics_client::registry::Registry;
7+
use lighthouse_network::prometheus_client::registry::Registry;
88
use lighthouse_version::version_with_platform;
99
use serde::{Deserialize, Serialize};
1010
use slog::{crit, info, Logger};

beacon_node/http_metrics/src/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::Context;
22
use beacon_chain::BeaconChainTypes;
33
use lighthouse_metrics::{Encoder, TextEncoder};
4-
use lighthouse_network::open_metrics_client::encoding::text::encode;
4+
use lighthouse_network::prometheus_client::encoding::text::encode;
55
use malloc_utils::scrape_allocator_metrics;
66

77
pub use lighthouse_metrics::*;

beacon_node/lighthouse_network/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ directory = { path = "../../common/directory" }
3838
regex = "1.3.9"
3939
strum = { version = "0.21.0", features = ["derive"] }
4040
superstruct = "0.4.0"
41-
open-metrics-client = "0.14.0"
41+
prometheus-client = "0.15.0"
4242

4343
[dependencies.libp2p]
44-
version = "0.42.1"
44+
git = "https://github.com/sigp/rust-libp2p"
45+
# branch libp2p-gossipsub-interval-hotfix
46+
rev = "e213703e616eaba3c482d7714775e0d37c4ae8e5"
4547
default-features = false
4648
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio", "plaintext", "secp256k1"]
4749

beacon_node/lighthouse_network/src/discovery/mod.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -927,24 +927,6 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
927927
}
928928
}
929929

930-
fn inject_connected(&mut self, _peer_id: &PeerId) {}
931-
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
932-
fn inject_connection_established(
933-
&mut self,
934-
_peer_id: &PeerId,
935-
_connection_id: &ConnectionId,
936-
_endpoint: &ConnectedPoint,
937-
_failed_addresses: Option<&Vec<Multiaddr>>,
938-
) {
939-
}
940-
fn inject_connection_closed(
941-
&mut self,
942-
_: &PeerId,
943-
_: &ConnectionId,
944-
_connected_point: &ConnectedPoint,
945-
_handler: Self::ProtocolsHandler,
946-
) {
947-
}
948930
fn inject_event(
949931
&mut self,
950932
_: PeerId,

beacon_node/lighthouse_network/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub use crate::types::{
6767
SubnetDiscovery,
6868
};
6969

70-
pub use open_metrics_client;
70+
pub use prometheus_client;
7171

7272
pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response};
7373
pub use config::Config as NetworkConfig;

beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
110110
_connection_id: &ConnectionId,
111111
endpoint: &ConnectedPoint,
112112
_failed_addresses: Option<&Vec<Multiaddr>>,
113+
_other_established: usize,
113114
) {
114115
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => ?endpoint.to_endpoint());
115116
// Check NAT if metrics are enabled
@@ -172,8 +173,18 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
172173
self.update_connected_peer_metrics();
173174
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
174175
}
176+
fn inject_connection_closed(
177+
&mut self,
178+
peer_id: &PeerId,
179+
_: &ConnectionId,
180+
_: &ConnectedPoint,
181+
_: DummyProtocolsHandler,
182+
remaining_established: usize,
183+
) {
184+
if remaining_established > 0 {
185+
return;
186+
}
175187

176-
fn inject_disconnected(&mut self, peer_id: &PeerId) {
177188
// There are no more connections
178189
if self
179190
.network_globals

beacon_node/lighthouse_network/src/rpc/mod.rs

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -202,36 +202,25 @@ where
202202
}
203203

204204
// Use connection established/closed instead of these currently
205-
fn inject_connected(&mut self, peer_id: &PeerId) {
206-
// find the peer's meta-data
207-
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id);
208-
let rpc_event =
209-
RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData));
210-
self.events.push(NetworkBehaviourAction::NotifyHandler {
211-
peer_id: *peer_id,
212-
handler: NotifyHandler::Any,
213-
event: rpc_event,
214-
});
215-
}
216-
217-
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
218-
219205
fn inject_connection_established(
220206
&mut self,
221-
_peer_id: &PeerId,
207+
peer_id: &PeerId,
222208
_connection_id: &ConnectionId,
223209
_endpoint: &ConnectedPoint,
224210
_failed_addresses: Option<&Vec<Multiaddr>>,
211+
other_established: usize,
225212
) {
226-
}
227-
228-
fn inject_connection_closed(
229-
&mut self,
230-
_peer_id: &PeerId,
231-
_: &ConnectionId,
232-
_connected_point: &ConnectedPoint,
233-
_handler: Self::ProtocolsHandler,
234-
) {
213+
if other_established == 0 {
214+
// find the peer's meta-data
215+
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id);
216+
let rpc_event =
217+
RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData));
218+
self.events.push(NetworkBehaviourAction::NotifyHandler {
219+
peer_id: *peer_id,
220+
handler: NotifyHandler::Any,
221+
event: rpc_event,
222+
});
223+
}
235224
}
236225

237226
fn inject_event(

beacon_node/lighthouse_network/src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use libp2p::{
2121
swarm::{SwarmBuilder, SwarmEvent},
2222
PeerId, Swarm, Transport,
2323
};
24-
use open_metrics_client::registry::Registry;
24+
use prometheus_client::registry::Registry;
2525
use slog::{crit, debug, info, o, trace, warn, Logger};
2626
use ssz::Decode;
2727
use std::fs::File;

beacon_node/lighthouse_network/tests/common/behaviour.rs

Lines changed: 87 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,8 @@ where
100100
inner: TInner,
101101

102102
pub addresses_of_peer: Vec<PeerId>,
103-
pub inject_connected: Vec<PeerId>,
104-
pub inject_disconnected: Vec<PeerId>,
105-
pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint)>,
106-
pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint)>,
103+
pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
104+
pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
107105
pub inject_event: Vec<(
108106
PeerId,
109107
ConnectionId,
@@ -128,8 +126,6 @@ where
128126
Self {
129127
inner,
130128
addresses_of_peer: Vec::new(),
131-
inject_connected: Vec::new(),
132-
inject_disconnected: Vec::new(),
133129
inject_connection_established: Vec::new(),
134130
inject_connection_closed: Vec::new(),
135131
inject_event: Vec::new(),
@@ -148,8 +144,6 @@ where
148144
#[allow(dead_code)]
149145
pub fn reset(&mut self) {
150146
self.addresses_of_peer = Vec::new();
151-
self.inject_connected = Vec::new();
152-
self.inject_disconnected = Vec::new();
153147
self.inject_connection_established = Vec::new();
154148
self.inject_connection_closed = Vec::new();
155149
self.inject_event = Vec::new();
@@ -176,7 +170,13 @@ where
176170
expected_disconnections: usize,
177171
) -> bool {
178172
if self.inject_connection_closed.len() == expected_closed_connections {
179-
assert_eq!(self.inject_disconnected.len(), expected_disconnections);
173+
assert_eq!(
174+
self.inject_connection_closed
175+
.iter()
176+
.filter(|(.., remaining_established)| { *remaining_established == 0 })
177+
.count(),
178+
expected_disconnections
179+
);
180180
return true;
181181
}
182182

@@ -193,7 +193,15 @@ where
193193
expected_connections: usize,
194194
) -> bool {
195195
if self.inject_connection_established.len() == expected_established_connections {
196-
assert_eq!(self.inject_connected.len(), expected_connections);
196+
assert_eq!(
197+
self.inject_connection_established
198+
.iter()
199+
.filter(|(.., reported_aditional_connections)| {
200+
*reported_aditional_connections == 0
201+
})
202+
.count(),
203+
expected_connections
204+
);
197205
return true;
198206
}
199207

@@ -219,37 +227,45 @@ where
219227
self.inner.addresses_of_peer(p)
220228
}
221229

222-
fn inject_connected(&mut self, peer: &PeerId) {
223-
assert!(
224-
self.inject_connection_established
225-
.iter()
226-
.any(|(peer_id, _, _)| peer_id == peer),
227-
"`inject_connected` is called after at least one `inject_connection_established`."
228-
);
229-
self.inject_connected.push(*peer);
230-
self.inner.inject_connected(peer);
231-
}
232-
233230
fn inject_connection_established(
234231
&mut self,
235232
p: &PeerId,
236233
c: &ConnectionId,
237234
e: &ConnectedPoint,
238235
errors: Option<&Vec<Multiaddr>>,
236+
other_established: usize,
239237
) {
240-
self.inject_connection_established.push((*p, *c, e.clone()));
241-
self.inner.inject_connection_established(p, c, e, errors);
242-
}
243-
244-
fn inject_disconnected(&mut self, peer: &PeerId) {
245-
assert!(
246-
self.inject_connection_closed
247-
.iter()
248-
.any(|(peer_id, _, _)| peer_id == peer),
249-
"`inject_disconnected` is called after at least one `inject_connection_closed`."
250-
);
251-
self.inject_disconnected.push(*peer);
252-
self.inner.inject_disconnected(peer);
238+
let mut other_peer_connections = self
239+
.inject_connection_established
240+
.iter()
241+
.rev() // take last to first
242+
.filter_map(|(peer, .., other_established)| {
243+
if p == peer {
244+
Some(other_established)
245+
} else {
246+
None
247+
}
248+
})
249+
.take(other_established);
250+
251+
// We are informed that there are `other_established` additional connections. Ensure that the
252+
// number of previous connections is consistent with this
253+
if let Some(&prev) = other_peer_connections.next() {
254+
if prev < other_established {
255+
assert_eq!(
256+
prev,
257+
other_established - 1,
258+
"Inconsistent connection reporting"
259+
)
260+
}
261+
assert_eq!(other_peer_connections.count(), other_established - 1);
262+
} else {
263+
assert_eq!(other_established, 0)
264+
}
265+
self.inject_connection_established
266+
.push((*p, *c, e.clone(), other_established));
267+
self.inner
268+
.inject_connection_established(p, c, e, errors, other_established);
253269
}
254270

255271
fn inject_connection_closed(
@@ -258,15 +274,46 @@ where
258274
c: &ConnectionId,
259275
e: &ConnectedPoint,
260276
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
277+
remaining_established: usize,
261278
) {
262-
let connection = (*p, *c, e.clone());
279+
let mut other_closed_connections = self
280+
.inject_connection_established
281+
.iter()
282+
.rev() // take last to first
283+
.filter_map(|(peer, .., remaining_established)| {
284+
if p == peer {
285+
Some(remaining_established)
286+
} else {
287+
None
288+
}
289+
})
290+
.take(remaining_established);
291+
292+
// We are informed that there are `other_established` additional connections. Ensure that the
293+
// number of previous connections is consistent with this
294+
if let Some(&prev) = other_closed_connections.next() {
295+
if prev < remaining_established {
296+
assert_eq!(
297+
prev,
298+
remaining_established - 1,
299+
"Inconsistent closed connection reporting"
300+
)
301+
}
302+
assert_eq!(other_closed_connections.count(), remaining_established - 1);
303+
} else {
304+
assert_eq!(remaining_established, 0)
305+
}
263306
assert!(
264-
self.inject_connection_established.contains(&connection),
307+
self.inject_connection_established
308+
.iter()
309+
.any(|(peer, conn_id, endpoint, _)| (peer, conn_id, endpoint) == (p, c, e)),
265310
"`inject_connection_closed` is called only for connections for \
266311
which `inject_connection_established` was called first."
267312
);
268-
self.inject_connection_closed.push(connection);
269-
self.inner.inject_connection_closed(p, c, e, handler);
313+
self.inject_connection_closed
314+
.push((*p, *c, e.clone(), remaining_established));
315+
self.inner
316+
.inject_connection_closed(p, c, e, handler, remaining_established);
270317
}
271318

272319
fn inject_event(
@@ -278,14 +325,14 @@ where
278325
assert!(
279326
self.inject_connection_established
280327
.iter()
281-
.any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id),
328+
.any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id),
282329
"`inject_event` is called for reported connections."
283330
);
284331
assert!(
285332
!self
286333
.inject_connection_closed
287334
.iter()
288-
.any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id),
335+
.any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id),
289336
"`inject_event` is never called for closed connections."
290337
);
291338

beacon_node/network/src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use futures::channel::mpsc::Sender;
1111
use futures::future::OptionFuture;
1212
use futures::prelude::*;
1313
use lighthouse_network::{
14-
open_metrics_client::registry::Registry, MessageAcceptance, Service as LibP2PService,
14+
prometheus_client::registry::Registry, MessageAcceptance, Service as LibP2PService,
1515
};
1616
use lighthouse_network::{
1717
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},

0 commit comments

Comments
 (0)