From 9158a129c283578d10c8cd22971a82270e52c019 Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 21 Sep 2021 20:28:46 +0530 Subject: [PATCH] Disconnect sync peer on rpc HandlerReject --- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 4 +++- beacon_node/network/src/router/mod.rs | 8 +++++--- beacon_node/network/src/router/processor.rs | 4 ++-- beacon_node/network/src/service.rs | 4 ++-- beacon_node/network/src/sync/manager.rs | 15 +++++++++++++-- 5 files changed, 25 insertions(+), 10 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 5b45a77d5bf..3dd8a1af6f3 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -77,6 +77,8 @@ pub enum BehaviourEvent { id: RequestId, /// The peer to which this request was sent. peer_id: PeerId, + /// The rpc error that was received. + error: RPCError, }, RequestReceived { /// The peer that sent the request. @@ -891,7 +893,7 @@ impl NetworkBehaviourEventProcess> for Behavio ); // inform failures of requests comming outside the behaviour if !matches!(id, RequestId::Behaviour) { - self.add_event(BehaviourEvent::RPCFailed { peer_id, id }); + self.add_event(BehaviourEvent::RPCFailed { peer_id, id, error }); } } } diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 5096a4bdc84..789f5e828c6 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -11,8 +11,8 @@ use crate::error; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ - rpc::RequestId, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, - Response, + rpc::{RPCError, RequestId}, + MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, }; use futures::prelude::*; use processor::Processor; @@ -59,6 +59,7 @@ pub enum RouterMessage { RPCFailed { peer_id: PeerId, request_id: RequestId, + error: RPCError, }, /// A gossip message has been received. The fields are: message id, the peer that sent us this /// message, the message itself and a bool which indicates if the message should be processed @@ -141,8 +142,9 @@ impl Router { RouterMessage::RPCFailed { peer_id, request_id, + error, } => { - self.processor.on_rpc_error(peer_id, request_id); + self.processor.on_rpc_error(peer_id, request_id, error); } RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => { self.handle_gossip(id, peer_id, gossip, should_process); diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 01ea98948ba..30e5012b582 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -95,10 +95,10 @@ impl Processor { /// An error occurred during an RPC request. The state is maintained by the sync manager, so /// this function notifies the sync manager of the error. - pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) { + pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { // Check if the failed RPC belongs to sync if let RequestId::Sync(id) = request_id { - self.send_to_sync(SyncMessage::RPCError(peer_id, id)); + self.send_to_sync(SyncMessage::RPCError(peer_id, id, error)); } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7badec3d486..ddfe67bf665 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -611,10 +611,10 @@ fn spawn_service( }); } - BehaviourEvent::RPCFailed{id, peer_id} => { + BehaviourEvent::RPCFailed{id, peer_id, error} => { let _ = service .router_send - .send(RouterMessage::RPCFailed{ peer_id, request_id: id}) + .send(RouterMessage::RPCFailed{ peer_id, request_id: id, error}) .map_err(|_| { debug!(service.log, "Failed to send RPC to router"); }); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 90ff61b41fb..d2747d348c5 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -42,6 +42,7 @@ use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; +use eth2_libp2p::rpc::RPCError; use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason}; use eth2_libp2p::types::{NetworkGlobals, SyncState}; use eth2_libp2p::SyncInfo; @@ -103,7 +104,7 @@ pub enum SyncMessage { Disconnect(PeerId), /// An RPC Error has occurred on a request. - RPCError(PeerId, RequestId), + RPCError(PeerId, RequestId, RPCError), /// A batch has been processed by the block processor thread. BatchProcessed { @@ -1020,11 +1021,16 @@ impl SyncManager { SyncMessage::Disconnect(peer_id) => { self.peer_disconnect(&peer_id); } - SyncMessage::RPCError(peer_id, request_id) => { + SyncMessage::RPCError(peer_id, request_id, error) => { // Redirect to a sync mechanism if the error is related to one of their // requests. match self.network.blocks_by_range_response(request_id, true) { Some(SyncRequestType::RangeSync(batch_id, chain_id)) => { + // `HandlerRejected` error signifies that the peer has disconnected from us. + // Remove the peer from any syncing chain mappings. + if let RPCError::HandlerRejected = error { + self.peer_disconnect(&peer_id); + } self.range_sync.inject_error( &mut self.network, peer_id, @@ -1035,6 +1041,11 @@ impl SyncManager { self.update_sync_state(); } Some(SyncRequestType::BackFillSync(batch_id)) => { + // `HandlerRejected` error signifies that the peer has disconnected from us. + // Remove the peer from any syncing chain mappings. + if let RPCError::HandlerRejected = error { + self.peer_disconnect(&peer_id); + } match self.backfill_sync.inject_error( &mut self.network, batch_id,