Skip to content

Commit 7595c68

Browse files
author
Gilad Chase
committed
chore(apollo_l1_endpoint_monitor): implement the core of l1 endpoint monitor
Constructors and other boilerplate stuff will come later.
1 parent 6cb02b4 commit 7595c68

File tree

3 files changed

+77
-1
lines changed

3 files changed

+77
-1
lines changed

crates/apollo_l1_endpoint_monitor/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,16 @@ repository.workspace = true
66
license.workspace = true
77

88
[dependencies]
9+
alloy.workspace = true
10+
apollo_config.workspace = true
11+
apollo_infra.workspace = true
12+
apollo_l1_endpoint_monitor_types.workspace = true
13+
async-trait.workspace = true
14+
serde.workspace = true
15+
thiserror.workspace = true
16+
tokio.workspace = true
17+
tracing.workspace = true
18+
url = { workspace = true, features = ["serde"] }
919

1020
[dev-dependencies]
1121

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1+
pub mod monitor;
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use std::collections::BTreeMap;
2+
3+
use alloy::providers::{Provider, ProviderBuilder};
4+
use apollo_config::dumping::{ser_param, SerializeConfig};
5+
use apollo_config::{ParamPath, ParamPrivacyInput, SerializedParam};
6+
use apollo_infra::component_definitions::ComponentStarter;
7+
use apollo_l1_endpoint_monitor_types::{L1EndpointMonitorError, L1EndpointMonitorResult};
8+
use serde::{Deserialize, Serialize};
9+
use tracing::{error, warn};
10+
use url::Url;
11+
12+
#[derive(Debug, Clone)]
13+
pub struct L1EndpointMonitor {
14+
pub current_l1_endpoint_index: usize,
15+
pub config: L1EndpointMonitorConfig,
16+
}
17+
18+
impl L1EndpointMonitor {
19+
/// Returns a functional L1 endpoint, or fails if all configured endpoints are non-operational.
20+
/// The method cycles through the configured endpoints, starting from the currently selected one
21+
/// and returns the first one that is operational.
22+
pub async fn get_active_l1_endpoint(&mut self) -> L1EndpointMonitorResult<Url> {
23+
let current_l1_endpoint_index = self.current_l1_endpoint_index;
24+
// This check can be done async, instead of blocking the user, but this requires an
25+
// additional "active" component or async task in our infra.
26+
if self.is_operational(current_l1_endpoint_index).await {
27+
return Ok(self.get_node_url(current_l1_endpoint_index).clone());
28+
}
29+
30+
let n_urls = self.config.ordered_l1_endpoint_urls.len();
31+
for offset in 1..n_urls {
32+
let idx = (current_l1_endpoint_index + offset) % n_urls;
33+
if self.is_operational(idx).await {
34+
warn!(
35+
"L1 endpoint {} down; switched to {}",
36+
self.get_node_url(current_l1_endpoint_index),
37+
self.get_node_url(idx)
38+
);
39+
40+
self.current_l1_endpoint_index = idx;
41+
return Ok(self.get_node_url(idx).clone());
42+
}
43+
}
44+
45+
error!("No operational L1 endpoints found in {:?}", self.config.ordered_l1_endpoint_urls);
46+
Err(L1EndpointMonitorError::NoActiveL1Endpoint)
47+
}
48+
49+
fn get_node_url(&self, index: usize) -> &Url {
50+
&self.config.ordered_l1_endpoint_urls[index]
51+
}
52+
53+
async fn is_operational(&self, l1_endpoint_index: usize) -> bool {
54+
let l1_endpoint_url = self.get_node_url(l1_endpoint_index);
55+
let l1_client = ProviderBuilder::new().on_http(l1_endpoint_url.clone());
56+
// Is this fast enough? we can use something to just check connectivity, but a recent infura
57+
// bug failed on this API even though connectivity was fine. Besides, this API is called for
58+
// most of our operations anyway.
59+
l1_client.get_block_number().await.is_ok()
60+
}
61+
}
62+
63+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
64+
pub struct L1EndpointMonitorConfig {
65+
pub ordered_l1_endpoint_urls: Vec<Url>,
66+
}

0 commit comments

Comments
 (0)