Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ea45814
using events api for eager start attestation tasks
hopinheimer Aug 18, 2025
6f20083
merge `origin/unstable` into `hopinheimer/eager-send-attestation`
hopinheimer Aug 18, 2025
46fe220
minor nits
hopinheimer Aug 18, 2025
de7226f
clippy chill vibes
hopinheimer Aug 18, 2025
5f2a101
missed something
hopinheimer Aug 18, 2025
ae16705
linty
hopinheimer Aug 18, 2025
c13d0c3
implemented head monitoring service
hopinheimer Sep 19, 2025
8405f01
Merge branch 'unstable' of github.com:sigp/lighthouse into eager-send…
hopinheimer Sep 19, 2025
65e04fb
merge from feature branch
hopinheimer Sep 19, 2025
a1d36e6
fixing some linting issues
hopinheimer Sep 19, 2025
a489d32
comments and removing unwanted code
hopinheimer Sep 20, 2025
2b974db
clippy change
hopinheimer Sep 20, 2025
5600aef
Merge branch 'unstable' into eager-send-attestation
hopinheimer Oct 15, 2025
b65fc30
same data attestation bug solved
hopinheimer Oct 16, 2025
9024931
Merge branch 'unstable' into eager-send-attestation
hopinheimer Oct 20, 2025
b25703f
fixing dangling conditions and amaking head_monitor_service optional
hopinheimer Oct 20, 2025
e68548b
Merge branch 'eager-send-attestation' of github.com:hopinheimer/light…
hopinheimer Oct 20, 2025
c4d851c
fmt
hopinheimer Oct 20, 2025
daf5b2e
update comment
hopinheimer Oct 20, 2025
c49b8eb
Merge branch 'unstable' into eager-send-attestation
hopinheimer Oct 25, 2025
0e35ee5
massive refact
hopinheimer Oct 28, 2025
29867d2
fixes and linting
hopinheimer Oct 29, 2025
fd43876
remove unused code
hopinheimer Oct 31, 2025
b054a10
changes
hopinheimer Nov 1, 2025
eb057d0
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 2, 2025
91e5980
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions validator_client/beacon_node_fallback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,41 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
Err(Errors(errors))
}

/// Try `func` on a specific beacon node by index first, then fall back to the normal order.
/// Returns immediately if the preferred node succeeds, otherwise falls back to first_success.
/// This is an insurance against potential race conditions that may arise.
pub async fn first_success_from_index<F, O, Err, R>(
&self,
preferred_index: Option<usize>,
func: F,
) -> Result<O, Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<O, Err>>,
Err: Debug,
{
let candidates = self.candidates.read().await;

// Try the preferred beacon node first if it exists
if let Some(preferred_idx) = preferred_index
&& let Some(preferred_candidate) = candidates.iter().find(|c| c.index == preferred_idx)
{
let preferred_node = preferred_candidate.beacon_node.clone();
drop(candidates);

match Self::run_on_candidate(preferred_node, &func).await {
Ok(val) => return Ok(val),
Err(_) => {
return self.first_success(func).await;
}
}
}

// Fall back to normal first_success behavior
drop(candidates);
self.first_success(func).await
}

/// Run the future `func` on `candidate` while reporting metrics.
async fn run_on_candidate<F, R, Err, O>(
candidate: BeaconNodeHttpClient,
Expand Down
19 changes: 19 additions & 0 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use metrics::set_gauge;
use monitoring_api::{MonitoringHttpClient, ProcessType};
use sensitive_url::SensitiveUrl;
use slashing_protection::{SLASHING_PROTECTION_FILENAME, SlashingDatabase};
use tokio::sync::Mutex;

use account_utils::validator_definitions::ValidatorDefinitions;
use beacon_node_fallback::{
Expand Down Expand Up @@ -42,6 +43,7 @@ use validator_services::{
attestation_service::{AttestationService, AttestationServiceBuilder},
block_service::{BlockService, BlockServiceBuilder},
duties_service::{self, DutiesService, DutiesServiceBuilder},
head_monitor_service::{HeadMonitorService, HeadMonitorServiceBuilder},
latency_service,
preparation_service::{PreparationService, PreparationServiceBuilder},
sync_committee_service::SyncCommitteeService,
Expand Down Expand Up @@ -79,6 +81,7 @@ pub struct ProductionValidatorClient<E: EthSpec> {
context: RuntimeContext<E>,
duties_service: Arc<DutiesService<ValidatorStore<E>, SystemTimeSlotClock>>,
block_service: BlockService<ValidatorStore<E>, SystemTimeSlotClock>,
head_monitor_service: HeadMonitorService<ValidatorStore<E>, SystemTimeSlotClock>,
attestation_service: AttestationService<ValidatorStore<E>, SystemTimeSlotClock>,
sync_committee_service: SyncCommitteeService<ValidatorStore<E>, SystemTimeSlotClock>,
doppelganger_service: Option<Arc<DoppelgangerService>>,
Expand Down Expand Up @@ -493,15 +496,25 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
block_service_builder = block_service_builder.proposer_nodes(proposer_nodes.clone());
}

let (head_sender, head_receiver) = mpsc::channel(1_024);

let block_service = block_service_builder.build()?;

let head_monitor_service = HeadMonitorServiceBuilder::new()
.executor(context.executor.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.head_monitor_tx(Arc::new(head_sender))
.build()?;

let attestation_service = AttestationServiceBuilder::new()
.duties_service(duties_service.clone())
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.executor(context.executor.clone())
.chain_spec(context.eth2_config.spec.clone())
.head_monitor_rx(Arc::new(Mutex::new(head_receiver)))
.disable(config.disable_attesting)
.build()?;

Expand All @@ -526,6 +539,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
context,
duties_service,
block_service,
head_monitor_service,
attestation_service,
sync_committee_service,
doppelganger_service,
Expand Down Expand Up @@ -604,6 +618,11 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.start_update_service(&self.context.eth2_config.spec)
.map_err(|e| format!("Unable to start preparation service: {}", e))?;

self.head_monitor_service
.clone()
.start_update_service()
.map_err(|e| format!("Unable to start head monitor service: {}", e))?;

if let Some(doppelganger_service) = self.doppelganger_service.clone() {
DoppelgangerService::start_update_service(
doppelganger_service,
Expand Down
59 changes: 54 additions & 5 deletions validator_client/validator_services/src/attestation_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::duties_service::{DutiesService, DutyAndProof};
use tokio::sync::Mutex;

use crate::head_monitor_service::HeadEvent;
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use futures::future::join_all;
use logging::crit;
Expand All @@ -7,6 +10,7 @@ use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant, sleep, sleep_until};
use tracing::{debug, error, info, trace, warn};
use tree_hash::TreeHash;
Expand All @@ -22,6 +26,7 @@ pub struct AttestationServiceBuilder<S: ValidatorStore, T: SlotClock + 'static>
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: Option<TaskExecutor>,
chain_spec: Option<Arc<ChainSpec>>,
head_monitor_rx: Option<Arc<Mutex<mpsc::Receiver<HeadEvent>>>>,
disable: bool,
}

Expand All @@ -34,6 +39,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationServiceBuil
beacon_nodes: None,
executor: None,
chain_spec: None,
head_monitor_rx: None,
disable: false,
}
}
Expand Down Expand Up @@ -73,6 +79,13 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationServiceBuil
self
}

pub fn head_monitor_rx(
mut self,
head_monitor_rx: Arc<Mutex<mpsc::Receiver<HeadEvent>>>,
) -> Self {
self.head_monitor_rx = Some(head_monitor_rx);
self
}
pub fn build(self) -> Result<AttestationService<S, T>, String> {
Ok(AttestationService {
inner: Arc::new(Inner {
Expand All @@ -94,6 +107,9 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationServiceBuil
chain_spec: self
.chain_spec
.ok_or("Cannot build AttestationService without chain_spec")?,
head_monitor_rx: self
.head_monitor_rx
.ok_or("Cannot build AttestationService without head_monitor_rx")?,
disable: self.disable,
}),
})
Expand All @@ -108,6 +124,7 @@ pub struct Inner<S, T> {
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
chain_spec: Arc<ChainSpec>,
head_monitor_rx: Arc<Mutex<mpsc::Receiver<HeadEvent>>>,
disable: bool,
}

Expand Down Expand Up @@ -160,9 +177,17 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
let interval_fut = async move {
loop {
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot + slot_duration / 3).await;
let mut beacon_node_index: Option<usize> = None;
tokio::select! {
_ = sleep(duration_to_next_slot + slot_duration / 3) => {},
head_event = self.poll_for_head_events() => {
if let Ok(event) = head_event {
beacon_node_index = Some(event.beacon_node_index);
}
}
}

if let Err(e) = self.spawn_attestation_tasks(slot_duration) {
if let Err(e) = self.spawn_attestation_tasks(slot_duration, beacon_node_index) {
crit!(error = e, "Failed to spawn attestation tasks")
} else {
trace!("Spawned attestation tasks");
Expand All @@ -180,9 +205,25 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}

async fn poll_for_head_events(&self) -> Result<HeadEvent, String> {
let mut receiver = self.head_monitor_rx.lock().await;
match receiver.recv().await {
Some(head_event) => Ok(head_event),
None => Err("Head monitor channel closed unexpectedly".to_string()),
}
}

/// For each each required attestation, spawn a new task that downloads, signs and uploads the
/// attestation to the beacon node.
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
fn spawn_attestation_tasks(
&self,
slot_duration: Duration,
beacon_node_index: Option<usize>,
) -> Result<(), String> {
info!(
"process attestation from beacon_node_index {:?}",
beacon_node_index
);
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
Expand Down Expand Up @@ -221,6 +262,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
committee_index,
validator_duties,
aggregate_production_instant,
beacon_node_index,
),
"attestation publish",
);
Expand Down Expand Up @@ -249,6 +291,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
committee_index: CommitteeIndex,
validator_duties: Vec<DutyAndProof>,
aggregate_production_instant: Instant,
candidate_beacon_node: Option<usize>,
) -> Result<(), ()> {
let attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
Expand All @@ -265,7 +308,12 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
//
// Download, sign and publish an `Attestation` for each validator.
let attestation_opt = self
.produce_and_publish_attestations(slot, committee_index, &validator_duties)
.produce_and_publish_attestations(
slot,
committee_index,
&validator_duties,
candidate_beacon_node,
)
.await
.map_err(move |e| {
crit!(
Expand Down Expand Up @@ -333,6 +381,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof],
candidate_beacon_node: Option<usize>,
) -> Result<Option<AttestationData>, String> {
if validator_duties.is_empty() {
return Ok(None);
Expand All @@ -346,7 +395,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,

let attestation_data = self
.beacon_nodes
.first_success(|beacon_node| async move {
.first_success_from_index(candidate_beacon_node, |beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
Expand Down
Loading
Loading