diff --git a/Cargo.lock b/Cargo.lock index d079ad6e78c..e246025635f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1395,8 +1395,10 @@ name = "apollo_l1_endpoint_monitor" version = "0.0.0" dependencies = [ "alloy", + "mockito 1.6.1", "serde", "thiserror 1.0.69", + "tokio", "tracing", "url", ] diff --git a/crates/apollo_l1_endpoint_monitor/Cargo.toml b/crates/apollo_l1_endpoint_monitor/Cargo.toml index 9e1892c4a6b..e5e96e1cb68 100644 --- a/crates/apollo_l1_endpoint_monitor/Cargo.toml +++ b/crates/apollo_l1_endpoint_monitor/Cargo.toml @@ -13,6 +13,8 @@ tracing.workspace = true url = { workspace = true, features = ["serde"] } [dev-dependencies] +mockito.workspace = true +tokio.workspace = true [lints] workspace = true diff --git a/crates/apollo_l1_endpoint_monitor/src/l1_endpoint_monitor_tests.rs b/crates/apollo_l1_endpoint_monitor/src/l1_endpoint_monitor_tests.rs new file mode 100644 index 00000000000..ef174b47152 --- /dev/null +++ b/crates/apollo_l1_endpoint_monitor/src/l1_endpoint_monitor_tests.rs @@ -0,0 +1,138 @@ +use mockito::{Matcher, Server, ServerGuard}; +use url::Url; + +use crate::monitor::{ + HEALTH_CHECK_RPC_METHOD, L1EndpointMonitor, L1EndpointMonitorConfig, L1EndpointMonitorError, +}; + +// Unreachable localhost endpoints for simulating failures. +// Using localhost to prevent IO (so don't switch to example.com in order to avoid port issues). +// Using these ports since they are not well-used ports in unix and privileged (<1024), +// so unless the user runs as root and binds them explicitly, they should be closed. +const BAD_ENDPOINT_1: &str = "http://localhost:1"; +const BAD_ENDPOINT_2: &str = "http://localhost:2"; + +// Helper to assert the active URL and current index in one call. +async fn check_get_active_l1_endpoint_success( + monitor: &mut L1EndpointMonitor, + expected_returned_url: &Url, + expected_index_of_returned_url: usize, +) { + let active = monitor.get_active_l1_endpoint().await.unwrap(); + assert_eq!(&active, expected_returned_url); + assert_eq!(monitor.current_l1_endpoint_index, expected_index_of_returned_url); +} + +fn url(url: &str) -> Url { + Url::parse(url).unwrap() +} + +/// Used to mock an L1 endpoint, like infura. +/// This can be replaced by Anvil, but for unit tests it isn't worth the large overhead Anvil +/// entails, given that we only need a valid HTTP response from the given url to test the API. +pub struct MockL1Endpoint { + pub url: Url, + pub endpoint: ServerGuard, +} + +async fn mock_working_l1_endpoint() -> MockL1Endpoint { + // Very simple mock is all we need _for now_: create a thin http server that expect a single + // call to the given API and return a valid response. Note that the validity of the response + // is coupled with the RPC method used. Server is dropped when the guard drops. + let mut server_guard = Server::new_async().await; + server_guard + .mock("POST", "/") + // Catch this specific RPC method. + .match_body(Matcher::PartialJsonString(format!( + r#"{{ "method": "{}"}}"#, + HEALTH_CHECK_RPC_METHOD + ))) + .with_status(200) + // Return 2_u64 as a valid response for the method. + .with_body(r#"{"jsonrpc":"2.0","id":1,"result":"0x2"}"#) + .create_async() + .await; + + let url = Url::parse(&server_guard.url()).unwrap(); + MockL1Endpoint { url, endpoint: server_guard } +} + +#[tokio::test] +async fn non_responsive_skips_to_next() { + // Setup. + let endpoint = mock_working_l1_endpoint().await; + let good_endpoint = endpoint.url.clone(); + + let mut monitor = L1EndpointMonitor { + current_l1_endpoint_index: 0, + config: L1EndpointMonitorConfig { + ordered_l1_endpoint_urls: vec![url(BAD_ENDPOINT_1), good_endpoint.clone()], + }, + }; + + // Test. + check_get_active_l1_endpoint_success(&mut monitor, &good_endpoint, 1).await; +} + +#[tokio::test] +async fn current_endpoint_still_works() { + // Setup. + let endpoint = mock_working_l1_endpoint().await; + let good_endpoint = endpoint.url.clone(); + + let mut monitor = L1EndpointMonitor { + current_l1_endpoint_index: 1, + config: L1EndpointMonitorConfig { + ordered_l1_endpoint_urls: vec![ + url(BAD_ENDPOINT_1), + good_endpoint.clone(), + url(BAD_ENDPOINT_2), + ], + }, + }; + + // Test. + check_get_active_l1_endpoint_success(&mut monitor, &good_endpoint, 1).await; +} + +#[tokio::test] +async fn wrap_around_success() { + // Setup. + let endpoint = mock_working_l1_endpoint().await; + let good_url = endpoint.url.clone(); + + let mut monitor = L1EndpointMonitor { + current_l1_endpoint_index: 2, + config: L1EndpointMonitorConfig { + ordered_l1_endpoint_urls: vec![ + url(BAD_ENDPOINT_1), + good_url.clone(), + url(BAD_ENDPOINT_2), + ], + }, + }; + + // Test. + check_get_active_l1_endpoint_success(&mut monitor, &good_url, 1).await; +} + +#[tokio::test] +async fn all_down_fails() { + // Setup. + let mut monitor = L1EndpointMonitor { + current_l1_endpoint_index: 0, + config: L1EndpointMonitorConfig { + ordered_l1_endpoint_urls: vec![url(BAD_ENDPOINT_1), url(BAD_ENDPOINT_2)], + }, + }; + + // Test. + let result = monitor.get_active_l1_endpoint().await; + assert_eq!(result, Err(L1EndpointMonitorError::NoActiveL1Endpoint)); + assert_eq!(monitor.current_l1_endpoint_index, 0); +} + +#[test] +#[ignore = "Enable once we add a constructor to the monitor (soon) which asserts \ + correct index and returns an error otherwise"] +fn initialized_with_index_out_of_bounds() {} diff --git a/crates/apollo_l1_endpoint_monitor/src/monitor.rs b/crates/apollo_l1_endpoint_monitor/src/monitor.rs index e02dac8c0ba..290c5309d26 100644 --- a/crates/apollo_l1_endpoint_monitor/src/monitor.rs +++ b/crates/apollo_l1_endpoint_monitor/src/monitor.rs @@ -1,11 +1,21 @@ +use alloy::primitives::U64; use alloy::providers::{Provider, ProviderBuilder}; use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::{error, warn}; use url::Url; +#[cfg(test)] +#[path = "l1_endpoint_monitor_tests.rs"] +pub mod l1_endpoint_monitor_tests; + type L1EndpointMonitorResult = Result; +/// The JSON-RPC method used to check L1 endpoint health. +// Note: is this fast enough? Alternatively, we can just check connectivity, but we already hit +// a bug in infura where the connectivity was fine, but get_block_number() failed. +pub const HEALTH_CHECK_RPC_METHOD: &str = "eth_blockNumber"; + #[derive(Debug, Clone)] pub struct L1EndpointMonitor { pub current_l1_endpoint_index: usize, @@ -47,13 +57,14 @@ impl L1EndpointMonitor { &self.config.ordered_l1_endpoint_urls[index] } + /// Check if the L1 endpoint is operational by sending a carefully-chosen request to it. + // note: Using a raw request instead of just alloy API (like `get_block_number()`) to improve + // high-level readability (through a dedicated const) and to improve testability. async fn is_operational(&self, l1_endpoint_index: usize) -> bool { let l1_endpoint_url = self.get_node_url(l1_endpoint_index); let l1_client = ProviderBuilder::new().on_http(l1_endpoint_url.clone()); - // Is this fast enough? we can use something to just check connectivity, but a recent infura - // bug failed on this API even though connectivity was fine. Besides, this API is called for - // most of our operations anyway. - l1_client.get_block_number().await.is_ok() + // Note: response type annotation is coupled with the rpc method used. + l1_client.client().request_noparams::(HEALTH_CHECK_RPC_METHOD).await.is_ok() } }