From ea2621ed01a6ab27ccf03fa4b6597c78c5075362 Mon Sep 17 00:00:00 2001 From: Gilad Chase Date: Mon, 5 May 2025 15:16:43 +0300 Subject: [PATCH] chore(apollo_l1_endpoint_monitor): implement the core of l1 endpoint monitor Constructors and other boilerplate stuff will come later. --- Cargo.lock | 7 ++ crates/apollo_l1_endpoint_monitor/Cargo.toml | 5 ++ crates/apollo_l1_endpoint_monitor/src/lib.rs | 2 +- .../apollo_l1_endpoint_monitor/src/monitor.rs | 69 +++++++++++++++++++ 4 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 crates/apollo_l1_endpoint_monitor/src/monitor.rs diff --git a/Cargo.lock b/Cargo.lock index 6e3a0338119..d079ad6e78c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1393,6 +1393,13 @@ dependencies = [ [[package]] name = "apollo_l1_endpoint_monitor" version = "0.0.0" +dependencies = [ + "alloy", + "serde", + "thiserror 1.0.69", + "tracing", + "url", +] [[package]] name = "apollo_l1_gas_price" diff --git a/crates/apollo_l1_endpoint_monitor/Cargo.toml b/crates/apollo_l1_endpoint_monitor/Cargo.toml index 88950457f88..9e1892c4a6b 100644 --- a/crates/apollo_l1_endpoint_monitor/Cargo.toml +++ b/crates/apollo_l1_endpoint_monitor/Cargo.toml @@ -6,6 +6,11 @@ repository.workspace = true license.workspace = true [dependencies] +alloy.workspace = true +serde.workspace = true +thiserror.workspace = true +tracing.workspace = true +url = { workspace = true, features = ["serde"] } [dev-dependencies] diff --git a/crates/apollo_l1_endpoint_monitor/src/lib.rs b/crates/apollo_l1_endpoint_monitor/src/lib.rs index 8b137891791..cb4171bb448 100644 --- a/crates/apollo_l1_endpoint_monitor/src/lib.rs +++ b/crates/apollo_l1_endpoint_monitor/src/lib.rs @@ -1 +1 @@ - +pub mod monitor; diff --git a/crates/apollo_l1_endpoint_monitor/src/monitor.rs b/crates/apollo_l1_endpoint_monitor/src/monitor.rs new file mode 100644 index 00000000000..e02dac8c0ba --- /dev/null +++ b/crates/apollo_l1_endpoint_monitor/src/monitor.rs @@ -0,0 +1,69 @@ +use alloy::providers::{Provider, ProviderBuilder}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tracing::{error, warn}; +use url::Url; + +type L1EndpointMonitorResult = Result; + +#[derive(Debug, Clone)] +pub struct L1EndpointMonitor { + pub current_l1_endpoint_index: usize, + pub config: L1EndpointMonitorConfig, +} + +impl L1EndpointMonitor { + /// Returns a functional L1 endpoint, or fails if all configured endpoints are non-operational. + /// The method cycles through the configured endpoints, starting from the currently selected one + /// and returns the first one that is operational. + pub async fn get_active_l1_endpoint(&mut self) -> L1EndpointMonitorResult { + let current_l1_endpoint_index = self.current_l1_endpoint_index; + // This check can be done async, instead of blocking the user, but this requires an + // additional "active" component or async task in our infra. + if self.is_operational(current_l1_endpoint_index).await { + return Ok(self.get_node_url(current_l1_endpoint_index).clone()); + } + + let n_urls = self.config.ordered_l1_endpoint_urls.len(); + for offset in 1..n_urls { + let idx = (current_l1_endpoint_index + offset) % n_urls; + if self.is_operational(idx).await { + warn!( + "L1 endpoint {} down; switched to {}", + self.get_node_url(current_l1_endpoint_index), + self.get_node_url(idx) + ); + + self.current_l1_endpoint_index = idx; + return Ok(self.get_node_url(idx).clone()); + } + } + + error!("No operational L1 endpoints found in {:?}", self.config.ordered_l1_endpoint_urls); + Err(L1EndpointMonitorError::NoActiveL1Endpoint) + } + + fn get_node_url(&self, index: usize) -> &Url { + &self.config.ordered_l1_endpoint_urls[index] + } + + async fn is_operational(&self, l1_endpoint_index: usize) -> bool { + let l1_endpoint_url = self.get_node_url(l1_endpoint_index); + let l1_client = ProviderBuilder::new().on_http(l1_endpoint_url.clone()); + // Is this fast enough? we can use something to just check connectivity, but a recent infura + // bug failed on this API even though connectivity was fine. Besides, this API is called for + // most of our operations anyway. + l1_client.get_block_number().await.is_ok() + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct L1EndpointMonitorConfig { + pub ordered_l1_endpoint_urls: Vec, +} + +#[derive(Debug, Clone, Error, Serialize, Deserialize, PartialEq, Eq)] +pub enum L1EndpointMonitorError { + #[error("All L1 endpoints are non-operational")] + NoActiveL1Endpoint, +}