diff --git a/Cargo.lock b/Cargo.lock index 5182a760..63de92dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1485,6 +1485,7 @@ dependencies = [ "base64 0.22.1", "bimap", "blst", + "bytes", "cipher 0.4.4", "ctr 0.9.2", "derive_more 2.0.1", @@ -1494,6 +1495,7 @@ dependencies = [ "ethereum_ssz 0.8.3", "ethereum_ssz_derive", "eyre", + "futures", "jsonwebtoken", "pbkdf2 0.12.2", "rand 0.9.0", diff --git a/Cargo.toml b/Cargo.toml index ff49bdea..5102238b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ base64 = "0.22.1" bimap = { version = "0.6.3", features = ["serde"] } blsful = "2.5" blst = "0.3.11" +bytes = "1.10.1" cb-cli = { path = "crates/cli" } cb-common = { path = "crates/common" } cb-metrics = { path = "crates/metrics" } diff --git a/config.example.toml b/config.example.toml index f6784d19..af84fdce 100644 --- a/config.example.toml +++ b/config.example.toml @@ -55,6 +55,9 @@ extra_validation_enabled = false # Execution Layer RPC url to use for extra validation # OPTIONAL rpc_url = "https://ethereum-holesky-rpc.publicnode.com" +# Timeout for any HTTP requests sent from the PBS module to other services, in seconds +# OPTIONAL, DEFAULT: 10 +http_timeout_seconds = 10 # Maximum number of retries for validator registrations per relay # OPTIONAL, DEFAULT: 3 register_validator_retry_limit = 3 diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 6bcee533..fe2f9aec 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -13,6 +13,7 @@ axum.workspace = true base64.workspace = true bimap.workspace = true blst.workspace = true +bytes.workspace = true cipher.workspace = true ctr.workspace = true derive_more.workspace = true @@ -22,6 +23,7 @@ ethereum_serde_utils.workspace = true ethereum_ssz.workspace = true ethereum_ssz_derive.workspace = true eyre.workspace = true +futures.workspace = true jsonwebtoken.workspace = true pbkdf2.workspace = true rand.workspace = true diff --git a/crates/common/src/config/constants.rs b/crates/common/src/config/constants.rs index 39773cf6..8b07f732 100644 --- a/crates/common/src/config/constants.rs +++ b/crates/common/src/config/constants.rs @@ -72,6 +72,14 @@ pub const PROXY_DIR_KEYS_DEFAULT: &str = "/proxy_keys"; pub const PROXY_DIR_SECRETS_ENV: &str = "CB_PROXY_SECRETS_DIR"; pub const PROXY_DIR_SECRETS_DEFAULT: &str = "/proxy_secrets"; +////////////////////////// MUXER ////////////////////////// + +/// Timeout for HTTP requests, in seconds +pub const HTTP_TIMEOUT_SECONDS_DEFAULT: u64 = 10; + +/// Max content length for Muxer HTTP responses, in bytes +pub const MUXER_HTTP_MAX_LENGTH: usize = 1024 * 1024 * 10; // 10 MiB + ///////////////////////// MODULES ///////////////////////// /// The unique ID of the module diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 77810ea8..487a5909 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -2,21 +2,28 @@ use std::{ collections::{HashMap, HashSet}, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use alloy::{ primitives::{address, Address, U256}, providers::ProviderBuilder, - rpc::types::beacon::BlsPublicKey, + rpc::{client::RpcClient, types::beacon::BlsPublicKey}, sol, + transports::http::Http, }; use eyre::{bail, ensure, Context}; +use reqwest::Client; use serde::{Deserialize, Serialize}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use url::Url; use super::{load_optional_env_var, PbsConfig, RelayConfig, MUX_PATH_ENV}; -use crate::{config::remove_duplicate_keys, pbs::RelayClient, types::Chain}; +use crate::{ + config::{remove_duplicate_keys, safe_read_http_response}, + pbs::RelayClient, + types::Chain, +}; #[derive(Debug, Deserialize, Serialize)] pub struct PbsMuxes { @@ -38,13 +45,16 @@ impl PbsMuxes { chain: Chain, default_pbs: &PbsConfig, ) -> eyre::Result> { + let http_timeout = Duration::from_secs(default_pbs.http_timeout_seconds); + let mut muxes = self.muxes; for mux in muxes.iter_mut() { ensure!(!mux.relays.is_empty(), "mux config {} must have at least one relay", mux.id); if let Some(loader) = &mux.loader { - let extra_keys = loader.load(&mux.id, chain, default_pbs.rpc_url.clone()).await?; + let extra_keys = + loader.load(&mux.id, chain, default_pbs.rpc_url.clone(), http_timeout).await?; mux.validator_pubkeys.extend(extra_keys); } @@ -164,6 +174,7 @@ impl MuxKeysLoader { mux_id: &str, chain: Chain, rpc_url: Option, + http_timeout: Duration, ) -> eyre::Result> { let keys = match self { Self::File(config_path) => { @@ -176,11 +187,17 @@ impl MuxKeysLoader { } Self::HTTP { url } => { - let client = reqwest::Client::new(); + let url = Url::parse(url).wrap_err("failed to parse mux keys URL")?; + if url.scheme() != "https" { + warn!( + "Mux keys URL {url} is insecure; consider using HTTPS if possible instead" + ); + } + let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; let response = client.get(url).send().await?; - let pubkeys = response.text().await?; - serde_json::from_str(&pubkeys) - .wrap_err("failed to fetch mux keys from http endpoint") + let pubkey_bytes = safe_read_http_response(response).await?; + serde_json::from_slice(&pubkey_bytes) + .wrap_err("failed to fetch mux keys from HTTP endpoint") } Self::Registry { registry, node_operator_id } => match registry { @@ -189,9 +206,17 @@ impl MuxKeysLoader { bail!("Lido registry requires RPC URL to be set in the PBS config"); }; - fetch_lido_registry_keys(rpc_url, chain, U256::from(*node_operator_id)).await + fetch_lido_registry_keys( + rpc_url, + chain, + U256::from(*node_operator_id), + http_timeout, + ) + .await + } + NORegistry::SSV => { + fetch_ssv_pubkeys(chain, U256::from(*node_operator_id), http_timeout).await } - NORegistry::SSV => fetch_ssv_pubkeys(chain, U256::from(*node_operator_id)).await, }, }?; @@ -237,10 +262,17 @@ async fn fetch_lido_registry_keys( rpc_url: Url, chain: Chain, node_operator_id: U256, + http_timeout: Duration, ) -> eyre::Result> { debug!(?chain, %node_operator_id, "loading operator keys from Lido registry"); - let provider = ProviderBuilder::new().on_http(rpc_url); + // Create an RPC provider with HTTP timeout support + let client = Client::builder().timeout(http_timeout).build()?; + let http = Http::with_client(client, rpc_url); + let is_local = http.guess_local(); + let rpc_client = RpcClient::new(http, is_local); + let provider = ProviderBuilder::new().on_client(rpc_client); + let registry_address = lido_registry_address(chain)?; let registry = LidoRegistry::new(registry_address, provider); @@ -293,6 +325,7 @@ async fn fetch_lido_registry_keys( async fn fetch_ssv_pubkeys( chain: Chain, node_operator_id: U256, + http_timeout: Duration, ) -> eyre::Result> { const MAX_PER_PAGE: usize = 100; @@ -303,22 +336,16 @@ async fn fetch_ssv_pubkeys( _ => bail!("SSV network is not supported for chain: {chain:?}"), }; - let client = reqwest::Client::new(); let mut pubkeys: Vec = vec![]; let mut page = 1; loop { - let response = client - .get(format!( - "https://api.ssv.network/api/v4/{}/validators/in_operator/{}?perPage={}&page={}", - chain_name, node_operator_id, MAX_PER_PAGE, page - )) - .send() - .await - .map_err(|e| eyre::eyre!("Error sending request to SSV network API: {e}"))? - .json::() - .await?; + let url = format!( + "https://api.ssv.network/api/v4/{}/validators/in_operator/{}?perPage={}&page={}", + chain_name, node_operator_id, MAX_PER_PAGE, page + ); + let response = fetch_ssv_pubkeys_from_url(&url, http_timeout).await?; pubkeys.extend(response.validators.iter().map(|v| v.pubkey).collect::>()); page += 1; @@ -336,6 +363,24 @@ async fn fetch_ssv_pubkeys( Ok(pubkeys) } +async fn fetch_ssv_pubkeys_from_url( + url: &str, + http_timeout: Duration, +) -> eyre::Result { + let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?; + let response = client.get(url).send().await.map_err(|e| { + if e.is_timeout() { + eyre::eyre!("Request to SSV network API timed out: {e}") + } else { + eyre::eyre!("Error sending request to SSV network API: {e}") + } + })?; + + // Parse the response as JSON + let body_bytes = safe_read_http_response(response).await?; + serde_json::from_slice::(&body_bytes).wrap_err("failed to parse SSV response") +} + #[derive(Deserialize)] struct SSVResponse { validators: Vec, @@ -355,10 +400,20 @@ struct SSVPagination { #[cfg(test)] mod tests { - use alloy::{primitives::U256, providers::ProviderBuilder}; + use std::net::SocketAddr; + + use alloy::{hex::FromHex, primitives::U256, providers::ProviderBuilder}; + use axum::{response::Response, routing::get}; + use tokio::{net::TcpListener, task::JoinHandle}; use url::Url; use super::*; + use crate::{ + config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH}, + utils::{set_ignore_content_length, ResponseReadError}, + }; + + const TEST_HTTP_TIMEOUT: u64 = 2; #[tokio::test] async fn test_lido_registry_address() -> eyre::Result<()> { @@ -393,14 +448,183 @@ mod tests { } #[tokio::test] + /// Tests that a successful SSV network fetch is handled and parsed properly async fn test_ssv_network_fetch() -> eyre::Result<()> { - let chain = Chain::Holesky; - let node_operator_id = U256::from(200); + // Start the mock server + let port = 30100; + let _server_handle = create_mock_server(port).await?; + let url = format!("http://localhost:{port}/ssv"); + let response = + fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)) + .await?; + + // Make sure the response is correct + // NOTE: requires that ssv_data.json dpesn't change + assert_eq!(response.validators.len(), 3); + let expected_pubkeys = [ + BlsPublicKey::from_hex( + "0x967ba17a3e7f82a25aa5350ec34d6923e28ad8237b5a41efe2c5e325240d74d87a015bf04634f21900963539c8229b2a", + )?, + BlsPublicKey::from_hex( + "0xac769e8cec802e8ffee34de3253be8f438a0c17ee84bdff0b6730280d24b5ecb77ebc9c985281b41ee3bda8663b6658c", + )?, + BlsPublicKey::from_hex( + "0x8c866a5a05f3d45c49b457e29365259021a509c5daa82e124f9701a960ee87b8902e87175315ab638a3d8b1115b23639", + )?, + ]; + for (i, validator) in response.validators.iter().enumerate() { + assert_eq!(validator.pubkey, expected_pubkeys[i]); + } + + // Clean up the server handle + _server_handle.abort(); + + Ok(()) + } - let pubkeys = fetch_ssv_pubkeys(chain, node_operator_id).await?; + #[tokio::test] + /// Tests that the SSV network fetch is handled properly when the response's + /// body is too large + async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> { + // Start the mock server + let port = 30101; + let _server_handle = create_mock_server(port).await?; + let url = format!("http://localhost:{port}/big_data"); + let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await; + + // The response should fail due to content length being too big + match response { + Ok(_) => { + panic!("Expected an error due to big content length, but got a successful response") + } + Err(e) => match e.downcast_ref::() { + Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { + assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); + assert!(*content_length > MUXER_HTTP_MAX_LENGTH); + assert!(raw.is_empty()); + } + _ => panic!("Expected PayloadTooLarge error, got: {}", e), + }, + } - assert_eq!(pubkeys.len(), 3); + // Clean up the server handle + _server_handle.abort(); Ok(()) } + + #[tokio::test] + /// Tests that the SSV network fetch is handled properly when the request + /// times out + async fn test_ssv_network_fetch_timeout() -> eyre::Result<()> { + // Start the mock server + let port = 30102; + let _server_handle = create_mock_server(port).await?; + let url = format!("http://localhost:{port}/timeout"); + let response = + fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await; + + // The response should fail due to timeout + assert!(response.is_err(), "Expected timeout error, but got success"); + if let Err(e) = response { + assert!(e.to_string().contains("timed out"), "Expected timeout error, got: {}", e); + } + + // Clean up the server handle + _server_handle.abort(); + + Ok(()) + } + + #[tokio::test] + /// Tests that the SSV network fetch is handled properly when the response's + /// content-length header is missing + async fn test_ssv_network_fetch_big_data_without_content_length() -> eyre::Result<()> { + // Start the mock server + let port = 30103; + set_ignore_content_length(true); + let _server_handle = create_mock_server(port).await?; + let url = format!("http://localhost:{port}/big_data"); + let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await; + + // The response should fail due to the body being too big + match response { + Ok(_) => { + panic!("Expected an error due to excessive data, but got a successful response") + } + Err(e) => match e.downcast_ref::() { + Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => { + assert_eq!(*max, MUXER_HTTP_MAX_LENGTH); + assert_eq!(*content_length, 0); + assert!(!raw.is_empty()); + } + _ => panic!("Expected PayloadTooLarge error, got: {}", e), + }, + } + + // Clean up the server handle + _server_handle.abort(); + + Ok(()) + } + + /// Creates a simple mock server to simulate the SSV API endpoint under + /// various conditions for testing + async fn create_mock_server(port: u16) -> Result, axum::Error> { + let router = axum::Router::new() + .route("/ssv", get(handle_ssv)) + .route("/big_data", get(handle_big_data)) + .route("/timeout", get(handle_timeout)) + .into_make_service(); + + let address = SocketAddr::from(([127, 0, 0, 1], port)); + let listener = TcpListener::bind(address).await.map_err(axum::Error::new)?; + let server = axum::serve(listener, router).with_graceful_shutdown(async { + tokio::signal::ctrl_c().await.expect("Failed to listen for shutdown signal"); + }); + let result = Ok(tokio::spawn(async move { + if let Err(e) = server.await { + eprintln!("Server error: {}", e); + } + })); + info!("Mock server started on http://localhost:{port}/"); + result + } + + /// Sends the good SSV JSON data to the client + async fn handle_ssv() -> Response { + // Read the JSON data + let data = include_str!("../../../../tests/data/ssv_valid.json"); + + // Create a valid response + Response::builder() + .status(200) + .header("Content-Type", "application/json") + .body(data.into()) + .unwrap() + } + + /// Sends a response with a large body - larger than the maximum allowed. + /// Note that hyper overwrites the content-length header automatically, so + /// setting it here wouldn't actually change the value that ultimately + /// gets sent to the server. + async fn handle_big_data() -> Response { + let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH); + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body(body.into()) + .unwrap() + } + + /// Simulates a timeout by sleeping for a long time + async fn handle_timeout() -> Response { + // Sleep for a long time to simulate a timeout + tokio::time::sleep(std::time::Duration::from_secs(2 * TEST_HTTP_TIMEOUT)).await; + Response::builder() + .status(200) + .header("Content-Type", "application/text") + .body("Timeout response".into()) + .unwrap() + } } diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index f41eef12..53d8ccd9 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -17,7 +17,7 @@ use url::Url; use super::{ constants::PBS_IMAGE_DEFAULT, load_optional_env_var, CommitBoostConfig, RuntimeMuxConfig, - PBS_ENDPOINT_ENV, + HTTP_TIMEOUT_SECONDS_DEFAULT, PBS_ENDPOINT_ENV, }; use crate::{ commit::client::SignerClient, @@ -123,6 +123,9 @@ pub struct PbsConfig { pub extra_validation_enabled: bool, /// Execution Layer RPC url to use for extra validation pub rpc_url: Option, + /// Timeout for HTTP requests in seconds + #[serde(default = "default_u64::")] + pub http_timeout_seconds: u64, /// Maximum number of retries for validator registration request per relay #[serde(default = "default_u32::")] pub register_validator_retry_limit: u32, diff --git a/crates/common/src/config/utils.rs b/crates/common/src/config/utils.rs index d94fd826..13784316 100644 --- a/crates/common/src/config/utils.rs +++ b/crates/common/src/config/utils.rs @@ -5,7 +5,7 @@ use eyre::{bail, Context, Result}; use serde::de::DeserializeOwned; use super::JWTS_ENV; -use crate::types::ModuleId; +use crate::{config::MUXER_HTTP_MAX_LENGTH, types::ModuleId, utils::read_chunked_body_with_max}; pub fn load_env_var(env: &str) -> Result { std::env::var(env).wrap_err(format!("{env} is not set")) @@ -31,6 +31,32 @@ pub fn load_jwt_secrets() -> Result> { decode_string_to_map(&jwt_secrets) } +/// Reads an HTTP response safely, erroring out if it failed or if the body is +/// too large. +pub async fn safe_read_http_response(response: reqwest::Response) -> Result> { + // Read the response to a buffer in chunks + let status_code = response.status(); + match read_chunked_body_with_max(response, MUXER_HTTP_MAX_LENGTH).await { + Ok(response_bytes) => { + if status_code.is_success() { + return Ok(response_bytes); + } + bail!( + "Request failed with status: {status_code}, body: {}", + String::from_utf8_lossy(&response_bytes) + ) + } + Err(e) => { + if status_code.is_success() { + return Err(e).wrap_err("Failed to read response body"); + } + Err(e).wrap_err(format!( + "Request failed with status {status_code}, but decoding the response body failed" + )) + } + } +} + /// Removes duplicate entries from a vector of BlsPublicKey pub fn remove_duplicate_keys(keys: Vec) -> Vec { let mut unique_keys = Vec::new(); diff --git a/crates/common/src/pbs/error.rs b/crates/common/src/pbs/error.rs index 242cb90e..9b42a626 100644 --- a/crates/common/src/pbs/error.rs +++ b/crates/common/src/pbs/error.rs @@ -4,7 +4,7 @@ use alloy::{ }; use thiserror::Error; -use crate::error::BlstErrorWrapper; +use crate::{error::BlstErrorWrapper, utils::ResponseReadError}; #[derive(Debug, Error)] pub enum PbsError { @@ -17,12 +17,12 @@ pub enum PbsError { #[error("json decode error: {err:?}, raw: {raw}")] JsonDecode { err: serde_json::Error, raw: String }, + #[error("{0}")] + ReadResponse(#[from] ResponseReadError), + #[error("relay response error. Code: {code}, err: {error_msg:?}")] RelayResponse { error_msg: String, code: u16 }, - #[error("response size exceeds max size: max: {max} raw: {raw}")] - PayloadTooLarge { max: usize, raw: String }, - #[error("failed validating relay response: {0}")] Validation(#[from] ValidationError), diff --git a/crates/common/src/pbs/event.rs b/crates/common/src/pbs/event.rs index 015de714..98276d14 100644 --- a/crates/common/src/pbs/event.rs +++ b/crates/common/src/pbs/event.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, time::Duration}; use alloy::{primitives::B256, rpc::types::beacon::relay::ValidatorRegistration}; use async_trait::async_trait; @@ -8,18 +8,18 @@ use axum::{ routing::post, Json, }; -use eyre::bail; +use eyre::{bail, Result}; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; -use tracing::{error, info, trace}; +use tracing::{error, info, trace, warn}; use url::Url; use super::{ GetHeaderParams, GetHeaderResponse, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse, }; use crate::{ - config::{load_optional_env_var, BUILDER_URLS_ENV}, + config::{load_optional_env_var, BUILDER_URLS_ENV, HTTP_TIMEOUT_SECONDS_DEFAULT}, pbs::BUILDER_EVENTS_PATH, }; @@ -48,11 +48,18 @@ pub struct BuilderEventPublisher { } impl BuilderEventPublisher { - pub fn new(endpoints: Vec) -> Self { - Self { client: reqwest::Client::new(), endpoints } + pub fn new(endpoints: Vec, http_timeout: Duration) -> Result { + for endpoint in &endpoints { + if endpoint.scheme() != "https" { + warn!("BuilderEventPublisher endpoint {endpoint} is insecure, consider using HTTPS if possible instead"); + } + } + Ok(Self { client: reqwest::ClientBuilder::new().timeout(http_timeout).build()?, endpoints }) } - pub fn new_from_env() -> eyre::Result> { + pub fn new_from_env() -> Result> { + let http_timeout = Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT); + load_optional_env_var(BUILDER_URLS_ENV) .map(|joined| { let endpoints = joined @@ -62,9 +69,9 @@ impl BuilderEventPublisher { let url = base.trim().parse::()?.join(BUILDER_EVENTS_PATH)?; Ok(url) }) - .collect::>>()?; + .collect::>>()?; - Ok(Self::new(endpoints)) + Self::new(endpoints, http_timeout) }) .transpose() } diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index a1dcb7cb..ccaf8888 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -1,3 +1,5 @@ +#[cfg(test)] +use std::cell::Cell; use std::{ net::Ipv4Addr, time::{SystemTime, UNIX_EPOCH}, @@ -9,11 +11,13 @@ use alloy::{ }; use axum::http::HeaderValue; use blst::min_pk::{PublicKey, Signature}; +use futures::StreamExt; use rand::{distr::Alphanumeric, Rng}; -use reqwest::header::HeaderMap; +use reqwest::{header::HeaderMap, Response}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use ssz::{Decode, Encode}; +use thiserror::Error; use tracing::Level; use tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation}; use tracing_subscriber::{ @@ -31,6 +35,83 @@ use crate::{ const MILLIS_PER_SECOND: u64 = 1_000; +#[derive(Debug, Error)] +pub enum ResponseReadError { + #[error( + "response size exceeds max size; max: {max}, content_length: {content_length}, raw: {raw}" + )] + PayloadTooLarge { max: usize, content_length: usize, raw: String }, + + #[error("error reading response stream: {0}")] + ReqwestError(#[from] reqwest::Error), +} + +#[cfg(test)] +thread_local! { + static IGNORE_CONTENT_LENGTH: Cell = const { Cell::new(false) }; +} + +#[cfg(test)] +pub fn set_ignore_content_length(val: bool) { + IGNORE_CONTENT_LENGTH.with(|f| f.set(val)); +} + +#[cfg(test)] +fn should_ignore_content_length() -> bool { + IGNORE_CONTENT_LENGTH.with(|f| f.get()) +} + +/// Reads the body of a response as a chunked stream, ensuring the size does not +/// exceed `max_size`. +pub async fn read_chunked_body_with_max( + res: Response, + max_size: usize, +) -> Result, ResponseReadError> { + // Get the content length from the response headers + #[cfg(not(test))] + let content_length = res.content_length(); + + #[cfg(test)] + let mut content_length = res.content_length(); + + #[cfg(test)] + if should_ignore_content_length() { + // Used for testing purposes to ignore content length + content_length = None; + } + + // Break if content length is provided but it's too big + if let Some(length) = content_length { + if length as usize > max_size { + return Err(ResponseReadError::PayloadTooLarge { + max: max_size, + content_length: length as usize, + raw: String::new(), // raw content is not available here + }); + } + } + + let mut stream = res.bytes_stream(); + let mut response_bytes = Vec::new(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + if response_bytes.len() + chunk.len() > max_size { + // avoid spamming logs if the message is too large + response_bytes.truncate(1024); + return Err(ResponseReadError::PayloadTooLarge { + max: max_size, + content_length: content_length.unwrap_or(0) as usize, + raw: String::from_utf8_lossy(&response_bytes).into_owned(), + }); + } + + response_bytes.extend_from_slice(&chunk); + } + + Ok(response_bytes) +} + pub fn timestamp_of_slot_start_sec(slot: u64, chain: Chain) -> u64 { chain.genesis_time_sec() + slot * chain.slot_time_sec() } diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index e4922245..85e3cf2c 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -19,7 +19,10 @@ use cb_common::{ signature::verify_signed_message, signer::BlsSignature, types::Chain, - utils::{get_user_agent_with_version, ms_into_slot, timestamp_of_slot_start_sec, utcnow_ms}, + utils::{ + get_user_agent_with_version, ms_into_slot, read_chunked_body_with_max, + timestamp_of_slot_start_sec, utcnow_ms, + }, }; use futures::future::join_all; use parking_lot::RwLock; @@ -36,7 +39,7 @@ use crate::{ }, metrics::{RELAY_HEADER_VALUE, RELAY_LAST_SLOT, RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, - utils::{check_gas_limit, read_chunked_body_with_max}, + utils::check_gas_limit, }; /// Implements https://ethereum.github.io/builder-specs/#/Builder/getHeader diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index c99f5d5f..5d2b5f1e 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -4,7 +4,7 @@ use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ pbs::{error::PbsError, RelayClient, HEADER_START_TIME_UNIX_MS}, - utils::{get_user_agent_with_version, utcnow_ms}, + utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms}, }; use eyre::bail; use futures::future::{join_all, select_ok}; @@ -16,7 +16,6 @@ use crate::{ constants::{MAX_SIZE_DEFAULT, REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, - utils::read_chunked_body_with_max, }; /// Implements https://ethereum.github.io/builder-specs/#/Builder/registerValidator diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index 591e7cd1..b1a82e57 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -3,7 +3,7 @@ use std::time::{Duration, Instant}; use axum::http::HeaderMap; use cb_common::{ pbs::{error::PbsError, RelayClient}, - utils::get_user_agent_with_version, + utils::{get_user_agent_with_version, read_chunked_body_with_max}, }; use futures::future::select_ok; use reqwest::header::USER_AGENT; @@ -13,7 +13,6 @@ use crate::{ constants::{MAX_SIZE_DEFAULT, STATUS_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, - utils::read_chunked_body_with_max, }; /// Implements https://ethereum.github.io/builder-specs/#/Builder/status diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index abb9554f..5b781e01 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -8,7 +8,7 @@ use cb_common::{ PayloadAndBlobsDeneb, PayloadAndBlobsElectra, RelayClient, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse, VersionedResponse, HEADER_START_TIME_UNIX_MS, }, - utils::{get_user_agent_with_version, utcnow_ms}, + utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms}, }; use futures::future::select_ok; use reqwest::header::USER_AGENT; @@ -21,7 +21,6 @@ use crate::{ }, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, - utils::read_chunked_body_with_max, }; /// Implements https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlock diff --git a/crates/pbs/src/utils.rs b/crates/pbs/src/utils.rs index f1673431..782ae79b 100644 --- a/crates/pbs/src/utils.rs +++ b/crates/pbs/src/utils.rs @@ -1,31 +1,3 @@ -use cb_common::pbs::error::PbsError; -use futures::StreamExt; -use reqwest::Response; - -pub async fn read_chunked_body_with_max( - res: Response, - max_size: usize, -) -> Result, PbsError> { - let mut stream = res.bytes_stream(); - let mut response_bytes = Vec::new(); - - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - if response_bytes.len() + chunk.len() > max_size { - // avoid spamming logs if the message is too large - response_bytes.truncate(1024); - return Err(PbsError::PayloadTooLarge { - max: max_size, - raw: String::from_utf8_lossy(&response_bytes).into_owned(), - }); - } - - response_bytes.extend_from_slice(&chunk); - } - - Ok(response_bytes) -} - const GAS_LIMIT_ADJUSTMENT_FACTOR: u64 = 1024; const GAS_LIMIT_MINIMUM: u64 = 5_000; diff --git a/tests/data/ssv_valid.json b/tests/data/ssv_valid.json new file mode 100644 index 00000000..e19b13e6 --- /dev/null +++ b/tests/data/ssv_valid.json @@ -0,0 +1,99 @@ +{ + "validators": [ + { + "id": 554991, + "public_key": "967ba17a3e7f82a25aa5350ec34d6923e28ad8237b5a41efe2c5e325240d74d87a015bf04634f21900963539c8229b2a", + "cluster": "0xf7c1283eb0c0f76b5fa84c7541d8d4d27751b4083a5e8dcb8ac9e72bb7f559b8", + "owner_address": "0xB2EE025B1d129c61E77223bAb42fc65b29B16243", + "status": "Inactive", + "is_valid": true, + "is_deleted": false, + "is_public_key_valid": true, + "is_shares_valid": true, + "is_operators_valid": true, + "operators": [ + 16, + 27, + 86, + 90, + 200, + 204, + 214 + ], + "validator_info": { + "index": 1476217, + "status": "withdrawal_possible", + "activation_epoch": 4950, + "effective_balance": 32000000000 + }, + "version": "v4", + "network": "holesky" + }, + { + "id": 554992, + "public_key": "ac769e8cec802e8ffee34de3253be8f438a0c17ee84bdff0b6730280d24b5ecb77ebc9c985281b41ee3bda8663b6658c", + "cluster": "0xf7c1283eb0c0f76b5fa84c7541d8d4d27751b4083a5e8dcb8ac9e72bb7f559b8", + "owner_address": "0xB2EE025B1d129c61E77223bAb42fc65b29B16243", + "status": "Inactive", + "is_valid": true, + "is_deleted": false, + "is_public_key_valid": true, + "is_shares_valid": true, + "is_operators_valid": true, + "operators": [ + 16, + 27, + 86, + 90, + 200, + 204, + 214 + ], + "validator_info": { + "index": 1476218, + "status": "withdrawal_possible", + "activation_epoch": 4950, + "effective_balance": 32000000000 + }, + "version": "v4", + "network": "holesky" + }, + { + "id": 554994, + "public_key": "8c866a5a05f3d45c49b457e29365259021a509c5daa82e124f9701a960ee87b8902e87175315ab638a3d8b1115b23639", + "cluster": "0xf7c1283eb0c0f76b5fa84c7541d8d4d27751b4083a5e8dcb8ac9e72bb7f559b8", + "owner_address": "0xB2EE025B1d129c61E77223bAb42fc65b29B16243", + "status": "Inactive", + "is_valid": true, + "is_deleted": false, + "is_public_key_valid": true, + "is_shares_valid": true, + "is_operators_valid": true, + "operators": [ + 16, + 27, + 86, + 90, + 200, + 204, + 214 + ], + "validator_info": { + "index": 1476222, + "status": "withdrawal_possible", + "activation_epoch": 4950, + "effective_balance": 32000000000 + }, + "version": "v4", + "network": "holesky" + } + ], + "pagination": { + "total": 3, + "pages": 1, + "per_page": 10, + "page": 1, + "current_first": 554991, + "current_last": 554994 + } +} \ No newline at end of file diff --git a/tests/src/utils.rs b/tests/src/utils.rs index d5409107..b677d800 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -81,6 +81,7 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig { late_in_slot_time_ms: u64::MAX, extra_validation_enabled: false, rpc_url: None, + http_timeout_seconds: 10, register_validator_retry_limit: u32::MAX, } }