Skip to content

Commit 0dc265d

Browse files
committed
Disconnect sync peer on rpc HandlerReject
1 parent a73dcb7 commit 0dc265d

File tree

5 files changed

+23
-12
lines changed

5 files changed

+23
-12
lines changed

beacon_node/eth2_libp2p/src/behaviour/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
7777
id: RequestId,
7878
/// The peer to which this request was sent.
7979
peer_id: PeerId,
80+
/// The rpc error that was received.
81+
error: RPCError,
8082
},
8183
RequestReceived {
8284
/// The peer that sent the request.
@@ -891,7 +893,7 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<RPCMessage<TSpec>> for Behavio
891893
);
892894
// inform failures of requests comming outside the behaviour
893895
if !matches!(id, RequestId::Behaviour) {
894-
self.add_event(BehaviourEvent::RPCFailed { peer_id, id });
896+
self.add_event(BehaviourEvent::RPCFailed { peer_id, id, error });
895897
}
896898
}
897899
}

beacon_node/network/src/router/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use crate::error;
1111
use crate::service::NetworkMessage;
1212
use beacon_chain::{BeaconChain, BeaconChainTypes};
1313
use eth2_libp2p::{
14-
rpc::RequestId, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request,
15-
Response,
14+
rpc::{RPCError, RequestId},
15+
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
1616
};
1717
use futures::prelude::*;
1818
use processor::Processor;
@@ -59,6 +59,7 @@ pub enum RouterMessage<T: EthSpec> {
5959
RPCFailed {
6060
peer_id: PeerId,
6161
request_id: RequestId,
62+
error: RPCError,
6263
},
6364
/// A gossip message has been received. The fields are: message id, the peer that sent us this
6465
/// message, the message itself and a bool which indicates if the message should be processed
@@ -141,8 +142,9 @@ impl<T: BeaconChainTypes> Router<T> {
141142
RouterMessage::RPCFailed {
142143
peer_id,
143144
request_id,
145+
error,
144146
} => {
145-
self.processor.on_rpc_error(peer_id, request_id);
147+
self.processor.on_rpc_error(peer_id, request_id, error);
146148
}
147149
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
148150
self.handle_gossip(id, peer_id, gossip, should_process);

beacon_node/network/src/router/processor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,10 @@ impl<T: BeaconChainTypes> Processor<T> {
9595

9696
/// An error occurred during an RPC request. The state is maintained by the sync manager, so
9797
/// this function notifies the sync manager of the error.
98-
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) {
98+
pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
9999
// Check if the failed RPC belongs to sync
100100
if let RequestId::Sync(id) = request_id {
101-
self.send_to_sync(SyncMessage::RPCError(peer_id, id));
101+
self.send_to_sync(SyncMessage::RPCError(peer_id, id, error));
102102
}
103103
}
104104

beacon_node/network/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -613,10 +613,10 @@ fn spawn_service<T: BeaconChainTypes>(
613613
});
614614

615615
}
616-
BehaviourEvent::RPCFailed{id, peer_id} => {
616+
BehaviourEvent::RPCFailed{id, peer_id, error} => {
617617
let _ = service
618618
.router_send
619-
.send(RouterMessage::RPCFailed{ peer_id, request_id: id})
619+
.send(RouterMessage::RPCFailed{ peer_id, request_id: id, error})
620620
.map_err(|_| {
621621
debug!(service.log, "Failed to send RPC to router");
622622
});

beacon_node/network/src/sync/manager.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent};
4141
use crate::service::NetworkMessage;
4242
use crate::status::ToStatusMessage;
4343
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
44+
use eth2_libp2p::rpc::RPCError;
4445
use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason};
4546
use eth2_libp2p::types::{NetworkGlobals, SyncState};
4647
use eth2_libp2p::SyncInfo;
@@ -102,7 +103,7 @@ pub enum SyncMessage<T: EthSpec> {
102103
Disconnect(PeerId),
103104

104105
/// An RPC Error has occurred on a request.
105-
RPCError(PeerId, RequestId),
106+
RPCError(PeerId, RequestId, RPCError),
106107

107108
/// A batch has been processed by the block processor thread.
108109
BatchProcessed {
@@ -576,7 +577,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
576577
}
577578
}
578579

579-
fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId) {
580+
fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
580581
trace!(self.log, "Sync manager received a failed RPC");
581582
// remove any single block lookups
582583
if self.single_block_lookups.remove(&request_id).is_some() {
@@ -597,6 +598,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
597598
return;
598599
}
599600

601+
// `HandlerRejected` error signifies that the peer has disconnected from us.
602+
// Remove the peer from any syncing chain mappings.
603+
if let RPCError::HandlerRejected = error {
604+
self.peer_disconnect(&peer_id);
605+
}
606+
600607
// otherwise, this is a range sync issue, notify the range sync
601608
self.range_sync
602609
.inject_error(&mut self.network, peer_id, request_id);
@@ -912,8 +919,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
912919
SyncMessage::Disconnect(peer_id) => {
913920
self.peer_disconnect(&peer_id);
914921
}
915-
SyncMessage::RPCError(peer_id, request_id) => {
916-
self.inject_error(peer_id, request_id);
922+
SyncMessage::RPCError(peer_id, request_id, error) => {
923+
self.inject_error(peer_id, request_id, error);
917924
}
918925
SyncMessage::BatchProcessed {
919926
chain_id,

0 commit comments

Comments
 (0)