diff --git a/validator_client/beacon_node_fallback/src/beacon_head_monitor.rs b/validator_client/beacon_node_fallback/src/beacon_head_monitor.rs new file mode 100644 index 00000000000..16749bee873 --- /dev/null +++ b/validator_client/beacon_node_fallback/src/beacon_head_monitor.rs @@ -0,0 +1,148 @@ +use crate::BeaconNodeFallback; +use eth2::types::EthSpec; +use eth2::types::SseHead; +use std::collections::HashMap; +use tokio::sync::RwLock; + +use eth2::types::EventTopic; +use tracing::{info, warn}; + +use eth2::types::EventKind; +use futures::StreamExt; +use slot_clock::SlotClock; +use std::sync::Arc; + +type CacheHashMap = HashMap; + +// This is used send the index derived from `CandidateBeaconNode` to the +// `AttestationService` for further processing +#[derive(Debug)] +pub struct HeadEvent { + pub beacon_node_index: usize, +} + +/// Cache to maintain the latest head received from each of the beacon nodes +/// in the `BeaconNodeFallback`. +#[derive(Debug)] +pub struct BeaconHeadCache { + cache: RwLock, +} + +impl BeaconHeadCache { + pub fn new() -> Self { + Self { + cache: RwLock::new(HashMap::new()), + } + } + + pub async fn get(&self, beacon_node_index: usize) -> Option { + self.cache.read().await.get(&beacon_node_index).cloned() + } + + pub async fn insert(&self, beacon_node_index: usize, head: SseHead) { + self.cache.write().await.insert(beacon_node_index, head); + } + + pub async fn is_latest(&self, head: &SseHead) -> bool { + let cache = self.cache.read().await; + cache + .values() + .all(|cache_head| head.slot >= cache_head.slot) + } + + pub async fn purge_cache(&self) { + self.cache.write().await.clear(); + } +} + +impl Default for BeaconHeadCache { + fn default() -> Self { + Self::new() + } +} + +// Runs a non-terminating loop to update the `BeaconHeadCache` with the latest head received +// from the candidate beacon_nodes. This is an attempt to stream events to beacon nodes and +// potential start attestion duties earlier as soon as latest head is receive from any of the +// beacon node in contrast to attest at the 1/3rd mark in the slot. +// +// +// The cache and the candidate BNs list are refresh/purged to avoid dangling reference conditions +// that arise due to `update_candidates_list`. +// +// Starts the service to perpetually stream head events from connected beacon_nodes +pub async fn poll_head_event_from_beacon_nodes( + beacon_nodes: Arc>, +) -> Result<(), String> { + let head_cache = beacon_nodes + .beacon_head_cache + .clone() + .expect("Unable to start head monitor without beacon_head_cache"); + let head_monitor_send = beacon_nodes + .head_monitor_send + .clone() + .expect("Unable to start head monitor without head_monitor_send"); + + info!("Starting head monitoring service"); + let candidates = { + let candidates_guard = beacon_nodes.candidates.read().await; + candidates_guard.clone() + }; + let mut tasks = vec![]; + + for candidate in candidates.iter() { + let head_event_stream = candidate + .beacon_node + .get_events::(&[EventTopic::Head]) + .await; + + let mut head_event_stream = match head_event_stream { + Ok(stream) => stream, + Err(e) => { + warn!("failed to get head event stream: {:?}", e); + continue; + } + }; + + let sender_tx = head_monitor_send.clone(); + let head_cache_ref = head_cache.clone(); + + let stream_fut = async move { + while let Some(event_result) = head_event_stream.next().await { + if let Ok(EventKind::Head(head)) = event_result { + head_cache_ref.insert(candidate.index, head.clone()).await; + + if !head_cache_ref.is_latest(&head).await { + continue; + } + + if sender_tx + .send(HeadEvent { + beacon_node_index: candidate.index, + }) + .await + .is_err() + { + warn!("Head monitoring service channel closed"); + } + } + } + }; + + tasks.push(stream_fut); + } + + if tasks.is_empty() { + head_cache.purge_cache().await; + return Err( + "No beacon nodes available for head event streaming, retry in sometime".to_string(), + ); + } + + futures::future::join_all(tasks).await; + + drop(candidates); + head_cache.purge_cache().await; + + Ok(()) +} diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 0f13d8c8b7b..baf4e3a7c0b 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -2,7 +2,10 @@ //! "fallback" behaviour; it will try a request on all of the nodes until one or none of them //! succeed. +pub mod beacon_head_monitor; pub mod beacon_node_health; + +use beacon_head_monitor::{BeaconHeadCache, HeadEvent, poll_head_event_from_beacon_nodes}; use beacon_node_health::{ BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, IsOptimistic, SyncDistanceTier, check_node_health, @@ -22,7 +25,11 @@ use std::time::{Duration, Instant}; use std::vec::Vec; use strum::EnumVariantNames; use task_executor::TaskExecutor; -use tokio::{sync::RwLock, time::sleep}; + +use tokio::{ + sync::{RwLock, mpsc}, + time::sleep, +}; use tracing::{debug, error, warn}; use types::{ChainSpec, Config as ConfigSpec, EthSpec, Slot}; use validator_metrics::{ENDPOINT_ERRORS, ENDPOINT_REQUESTS, inc_counter_vec}; @@ -68,6 +75,20 @@ pub fn start_fallback_updater_service( return Err("Cannot start fallback updater without slot clock"); } + let beacon_nodes_ref = beacon_nodes.clone(); + + let head_monitor_future = async move { + loop { + if let Err(err) = + poll_head_event_from_beacon_nodes::(beacon_nodes_ref.clone()).await + { + warn!(error=?err, "Head service failed"); + } + } + }; + + executor.spawn(head_monitor_future, "head_monitoring"); + let future = async move { loop { beacon_nodes.update_all_candidates::().await; @@ -380,6 +401,8 @@ pub struct BeaconNodeFallback { pub candidates: Arc>>, distance_tiers: BeaconNodeSyncDistanceTiers, slot_clock: Option, + beacon_head_cache: Option>, + head_monitor_send: Option>>, broadcast_topics: Vec, spec: Arc, } @@ -396,6 +419,8 @@ impl BeaconNodeFallback { candidates: Arc::new(RwLock::new(candidates)), distance_tiers, slot_clock: None, + beacon_head_cache: Some(Arc::new(BeaconHeadCache::new())), + head_monitor_send: None, broadcast_topics, spec, } @@ -415,6 +440,10 @@ impl BeaconNodeFallback { self.candidates.read().await.len() } + pub fn set_head_send(&mut self, head_monitor_send: Arc>) { + self.head_monitor_send = Some(head_monitor_send); + } + /// The count of candidates that are online and compatible, but not necessarily synced. pub async fn num_available(&self) -> usize { let mut n = 0; @@ -493,6 +522,10 @@ impl BeaconNodeFallback { let mut candidates = self.candidates.write().await; *candidates = new_candidates; + if let Some(cache) = &self.beacon_head_cache { + cache.purge_cache().await; + } + Ok(new_list) } @@ -646,6 +679,41 @@ impl BeaconNodeFallback { Err(Errors(errors)) } + /// Try `func` on a specific beacon node by index first, then fall back to the normal order. + /// Returns immediately if the preferred node succeeds, otherwise falls back to first_success. + /// This is an insurance against potential race conditions that may arise. + pub async fn first_success_from_index( + &self, + preferred_index: Option, + func: F, + ) -> Result> + where + F: Fn(BeaconNodeHttpClient) -> R + Clone, + R: Future>, + Err: Debug, + { + let candidates = self.candidates.read().await; + + // Try the preferred beacon node first if it exists + if let Some(preferred_idx) = preferred_index + && let Some(preferred_candidate) = candidates.iter().find(|c| c.index == preferred_idx) + { + let preferred_node = preferred_candidate.beacon_node.clone(); + drop(candidates); + + match Self::run_on_candidate(preferred_node, &func).await { + Ok(val) => return Ok(val), + Err(_) => { + return self.first_success(func).await; + } + } + } + + // Fall back to normal first_success behavior + drop(candidates); + self.first_success(func).await + } + /// Run the future `func` on `candidate` while reporting metrics. async fn run_on_candidate( candidate: BeaconNodeHttpClient, diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 71bdde10b02..34f26a1ac2e 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -9,6 +9,7 @@ use metrics::set_gauge; use monitoring_api::{MonitoringHttpClient, ProcessType}; use sensitive_url::SensitiveUrl; use slashing_protection::{SLASHING_PROTECTION_FILENAME, SlashingDatabase}; +use tokio::sync::Mutex; use account_utils::validator_definitions::ValidatorDefinitions; use beacon_node_fallback::{ @@ -72,6 +73,8 @@ pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2; /// Number of slots in advance to compute sync selection proofs when in `distributed` mode. pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1; +const MAX_HEAD_EVENT_QUEUE_LEN: usize = 1_024; + type ValidatorStore = LighthouseValidatorStore; #[derive(Clone)] @@ -383,9 +386,16 @@ impl ProductionValidatorClient { Duration::from_secs(context.eth2_config.spec.seconds_per_slot), ); + let (head_send, head_receiver) = mpsc::channel(MAX_HEAD_EVENT_QUEUE_LEN); + + let head_send_ref = Arc::new(head_send); + beacon_nodes.set_slot_clock(slot_clock.clone()); proposer_nodes.set_slot_clock(slot_clock.clone()); + beacon_nodes.set_head_send(head_send_ref.clone()); + proposer_nodes.set_head_send(head_send_ref.clone()); + let beacon_nodes = Arc::new(beacon_nodes); start_fallback_updater_service::<_, E>(context.executor.clone(), beacon_nodes.clone())?; @@ -502,6 +512,7 @@ impl ProductionValidatorClient { .beacon_nodes(beacon_nodes.clone()) .executor(context.executor.clone()) .chain_spec(context.eth2_config.spec.clone()) + .head_monitor_rx(Arc::new(Mutex::new(head_receiver))) .disable(config.disable_attesting) .build()?; diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index da6e8f35886..429621332e8 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,5 +1,7 @@ use crate::duties_service::{DutiesService, DutyAndProof}; -use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; +use tokio::sync::Mutex; + +use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, beacon_head_monitor::HeadEvent}; use futures::future::join_all; use logging::crit; use slot_clock::SlotClock; @@ -7,6 +9,7 @@ use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; +use tokio::sync::mpsc; use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{debug, error, info, trace, warn}; use tree_hash::TreeHash; @@ -22,6 +25,7 @@ pub struct AttestationServiceBuilder beacon_nodes: Option>>, executor: Option, chain_spec: Option>, + head_monitor_rx: Option>>>, disable: bool, } @@ -34,6 +38,7 @@ impl AttestationServiceBuil beacon_nodes: None, executor: None, chain_spec: None, + head_monitor_rx: None, disable: false, } } @@ -73,6 +78,13 @@ impl AttestationServiceBuil self } + pub fn head_monitor_rx( + mut self, + head_monitor_rx: Arc>>, + ) -> Self { + self.head_monitor_rx = Some(head_monitor_rx); + self + } pub fn build(self) -> Result, String> { Ok(AttestationService { inner: Arc::new(Inner { @@ -94,7 +106,11 @@ impl AttestationServiceBuil chain_spec: self .chain_spec .ok_or("Cannot build AttestationService without chain_spec")?, + head_monitor_rx: self + .head_monitor_rx + .ok_or("Cannot build AttestationService without head_monitor_rx")?, disable: self.disable, + latest_attested_slot: Mutex::new(Slot::default()), }), }) } @@ -108,10 +124,13 @@ pub struct Inner { beacon_nodes: Arc>, executor: TaskExecutor, chain_spec: Arc, + head_monitor_rx: Arc>>, disable: bool, + latest_attested_slot: Mutex, } -/// Attempts to produce attestations for all known validators 1/3rd of the way through each slot. +/// Attempts to produce attestations for all known validators 1/3rd of the way through each slot +/// or when a head event is received from the BNs. /// /// If any validators are on the same committee, a single attestation will be downloaded and /// returned to the beacon node. This attestation will have a signature from each of the @@ -159,19 +178,38 @@ impl AttestationService None, + Ok(event) = self.poll_for_head_events() => Some(event.beacon_node_index), + else => None + }; + + let Some(current_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after trigger"); + continue; + }; + + let mut last_slot = self.latest_attested_slot.lock().await; + + if current_slot <= *last_slot { + debug!(?current_slot, "Attestation already initiated for the slot"); + continue; + } + + match self.spawn_attestation_tasks(slot_duration, beacon_node_index) { + Ok(_) => { + *last_slot = current_slot; + trace!(?current_slot, "Spawned attestation tasks"); + } + Err(e) => { + crit!(error = e, "Failed to spawn attestation tasks") + } } } }; @@ -180,9 +218,26 @@ impl AttestationService Result { + let mut receiver = self.head_monitor_rx.lock().await; + match receiver.recv().await { + Some(head_event) => Ok(head_event), + None => Err("Head monitor channel closed unexpectedly".to_string()), + } + } + /// For each each required attestation, spawn a new task that downloads, signs and uploads the /// attestation to the beacon node. - fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> { + fn spawn_attestation_tasks( + &self, + slot_duration: Duration, + beacon_node_index: Option, + ) -> Result<(), String> { + info!( + ?beacon_node_index, + "process attestation from beacon_node_index", + ); + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; let duration_to_next_slot = self .slot_clock @@ -221,6 +276,7 @@ impl AttestationService AttestationService, aggregate_production_instant: Instant, + candidate_beacon_node: Option, ) -> Result<(), ()> { let attestations_timer = validator_metrics::start_timer_vec( &validator_metrics::ATTESTATION_SERVICE_TIMES, @@ -265,7 +322,12 @@ impl AttestationService AttestationService, ) -> Result, String> { if validator_duties.is_empty() { return Ok(None); @@ -346,7 +409,7 @@ impl AttestationService