diff --git a/integration-tests/Cargo.lock b/integration-tests/Cargo.lock index 067572280..17281f41e 100644 --- a/integration-tests/Cargo.lock +++ b/integration-tests/Cargo.lock @@ -1878,6 +1878,8 @@ dependencies = [ "pool_sv2", "primitive-types", "rand", + "serde", + "serde_json", "sha2 0.10.9", "stratum-apps", "tokio", diff --git a/integration-tests/Cargo.toml b/integration-tests/Cargo.toml index 68c8979b0..254af0b7c 100644 --- a/integration-tests/Cargo.toml +++ b/integration-tests/Cargo.toml @@ -28,6 +28,8 @@ tracing = { version = "0.1.41", default-features = false } tracing-subscriber = { version = "0.3.19", default-features = false } hex = "0.4.3" clap = { version = "^4.5.4", features = ["derive"] } +serde_json = "1" +serde = { version = "1", features = ["derive"] } # Direct dependencies kept only for the embedded `mining_device` module. # Remove this block when removing: diff --git a/integration-tests/lib/prometheus_metrics_assertions.rs b/integration-tests/lib/prometheus_metrics_assertions.rs index cfd38098a..6a93f2163 100644 --- a/integration-tests/lib/prometheus_metrics_assertions.rs +++ b/integration-tests/lib/prometheus_metrics_assertions.rs @@ -1,7 +1,93 @@ //! Helpers for querying and asserting on Prometheus metrics and JSON API endpoints //! exposed by SV2 components during integration tests. -use std::{collections::HashMap, fmt, net::SocketAddr}; +use serde::Deserialize; +use std::{collections::HashMap, fmt, net::SocketAddr, time::Duration}; + +// ── Typed response structs for JSON API endpoints ───────────────────────────── +// +// These mirror just the fields the integration tests care about, and are +// deliberately named with an `Api*` prefix so they don't collide with the +// production response types defined inside `stratum_apps::monitoring`. +// Serde silently ignores extra fields, so adding fields on the production side +// does not break these structs. + +/// Response from `/api/v1/global`. +#[derive(Debug, Deserialize)] +pub struct ApiGlobalResponse { + pub uptime_secs: u64, + pub server: Option, + pub sv2_clients: Option, + pub sv1_clients: Option, +} + +/// Server (upstream) summary, as embedded in `ApiGlobalResponse`. +#[derive(Debug, Deserialize)] +pub struct ApiServerSummary { + pub extended_channels: u64, + pub standard_channels: u64, +} + +/// Sv2 clients summary, as embedded in `ApiGlobalResponse`. +#[derive(Debug, Deserialize)] +pub struct ApiSv2ClientsSummary { + pub total_clients: u64, +} + +/// Sv1 clients summary, as embedded in `ApiGlobalResponse`. +#[derive(Debug, Deserialize)] +pub struct ApiSv1ClientsSummary { + pub total_clients: u64, +} + +/// Response from `/api/v1/server`. +#[derive(Debug, Deserialize)] +pub struct ApiServerResponse { + pub extended_channels_count: usize, +} + +/// Generic paginated response (`/api/v1/clients`, `/api/v1/sv1/clients`). +#[derive(Debug, Deserialize)] +pub struct ApiPaginatedResponse { + pub total: u64, + pub items: Vec, +} + +/// Sv2 client metadata returned in the paginated `/api/v1/clients` listing +/// and from `/api/v1/clients/{id}`. +#[derive(Debug, Deserialize)] +pub struct ApiClientMetadata { + pub client_id: u64, +} + +/// Response from `/api/v1/clients/{id}/channels`. +#[derive(Debug, Deserialize)] +pub struct ApiClientChannelsResponse { + pub client_id: u64, + pub total_extended: u64, + pub total_standard: u64, +} + +/// Sv1 client info returned from `/api/v1/sv1/clients` and +/// `/api/v1/sv1/clients/{id}`. +#[derive(Debug, Deserialize)] +pub struct ApiSv1Client { + pub client_id: u64, + pub authorized_worker_name: String, +} + +/// JSON error body returned from any 4xx/5xx response. +#[derive(Debug, Deserialize)] +pub struct ApiErrorResponse { + pub error: String, +} + +/// Response from the `/` root endpoint listing the available APIs. +#[derive(Debug, Deserialize)] +pub struct ApiRootResponse { + pub service: String, + pub endpoints: serde_json::Value, +} /// Fetch the raw Prometheus text-format metrics from a component's `/metrics` endpoint. /// Uses `spawn_blocking` to avoid blocking the tokio runtime with synchronous HTTP calls. @@ -27,6 +113,229 @@ pub async fn fetch_api(monitoring_addr: SocketAddr, path: &str) -> String { .expect("spawn_blocking for fetch_api panicked") } +/// Fetch a JSON API endpoint and parse the response into a `serde_json::Value`. +pub async fn fetch_api_json(monitoring_addr: SocketAddr, path: &str) -> serde_json::Value { + let body = fetch_api(monitoring_addr, path).await; + serde_json::from_str(&body).unwrap_or_else(|e| { + panic!( + "Failed to parse JSON from {} response: {}\nBody: {}", + path, e, body + ) + }) +} + +/// Fetch a JSON API endpoint and parse the response into a typed struct. +pub async fn fetch_api_typed( + monitoring_addr: SocketAddr, + path: &str, +) -> T { + let body = fetch_api(monitoring_addr, path).await; + serde_json::from_str(&body).unwrap_or_else(|e| { + panic!( + "Failed to parse JSON from {} into {}: {}\nBody: {}", + path, + std::any::type_name::(), + e, + body + ) + }) +} + +/// Fetch a JSON API endpoint returning both the HTTP status code and parsed JSON body. +/// Unlike `fetch_api_json`, this does **not** panic on non-2xx responses, so it can be +/// used to test error endpoints (e.g. 404). +pub async fn fetch_api_with_status( + monitoring_addr: SocketAddr, + path: &str, +) -> (i32, serde_json::Value) { + let url = format!("http://{}{}", monitoring_addr, path); + tokio::task::spawn_blocking(move || { + let (status, bytes) = crate::utils::http::make_get_request_with_status(&url, 5); + let body = String::from_utf8(bytes).expect("api response should be valid UTF-8"); + let json: serde_json::Value = serde_json::from_str(&body).unwrap_or_else(|e| { + panic!( + "Failed to parse JSON from {} (status {}): {}\nBody: {}", + url, status, e, body + ) + }); + (status, json) + }) + .await + .expect("spawn_blocking for fetch_api_with_status panicked") +} + +/// Poll a JSON API endpoint until a numeric field at `json_pointer` (RFC 6901, e.g. +/// `"/sv2_clients/total_clients"`) reaches `>= min`. Returns the full JSON value once +/// satisfied. Panics if the condition is not met within `timeout`. +/// +/// This is the JSON equivalent of `poll_until_metric_gte` — use it for endpoints whose +/// data only appears after the monitoring snapshot cache has refreshed. +pub async fn poll_until_api_field_gte( + monitoring_addr: SocketAddr, + path: &str, + json_pointer: &str, + min: f64, + timeout: Duration, +) -> serde_json::Value { + let deadline = tokio::time::Instant::now() + timeout; + loop { + // Use fetch_api_with_status so that transient non-2xx responses (e.g. 404 + // before the snapshot cache has populated) are retried instead of panicking. + let (status, json) = fetch_api_with_status(monitoring_addr, path).await; + if (200..300).contains(&status) { + if let Some(val) = json.pointer(json_pointer) { + let num = val.as_f64().unwrap_or(0.0); + if num >= min { + return json; + } + } + } + if tokio::time::Instant::now() >= deadline { + panic!( + "JSON field '{}' at {} never reached >= {} within {:?}. Last status: {}. Last response:\n{}", + json_pointer, + path, + min, + timeout, + status, + serde_json::to_string_pretty(&json).unwrap_or_default() + ); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } +} + +/// Internal: poll `path` until `condition` is met, returning the parsed `T`. +/// Retries on non-2xx responses (endpoint may not be ready yet). +async fn poll_until( + monitoring_addr: SocketAddr, + path: &'static str, + timeout: Duration, + condition: F, + timeout_msg: &'static str, +) -> T +where + T: serde::de::DeserializeOwned, + F: Fn(&T) -> bool, +{ + let deadline = tokio::time::Instant::now() + timeout; + loop { + let (status, body) = { + let url = format!("http://{}{}", monitoring_addr, path); + tokio::task::spawn_blocking(move || { + crate::utils::http::make_get_request_with_status(&url, 5) + }) + .await + .expect("spawn_blocking panicked") + }; + if (200..300).contains(&status) { + let body_str = String::from_utf8(body).expect("response should be valid UTF-8"); + if let Ok(resp) = serde_json::from_str::(&body_str) { + if condition(&resp) { + return resp; + } + } + } + if tokio::time::Instant::now() >= deadline { + panic!("{} within {:?}", timeout_msg, timeout); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } +} + +/// Poll `/api/v1/global` until `sv2_clients.total_clients >= min`. +/// Returns the parsed `ApiGlobalResponse` once satisfied. +pub async fn poll_until_global_sv2_clients_gte( + monitoring_addr: SocketAddr, + min: u64, + timeout: Duration, +) -> ApiGlobalResponse { + poll_until( + monitoring_addr, + "/api/v1/global", + timeout, + move |r: &ApiGlobalResponse| { + r.sv2_clients + .as_ref() + .is_some_and(|c| c.total_clients >= min) + }, + "ApiGlobalResponse sv2_clients.total_clients never reached >= expected", + ) + .await +} + +/// Poll `/api/v1/global` until `sv1_clients.total_clients >= min`. +/// Returns the parsed `ApiGlobalResponse` once satisfied. +pub async fn poll_until_global_sv1_clients_gte( + monitoring_addr: SocketAddr, + min: u64, + timeout: Duration, +) -> ApiGlobalResponse { + poll_until( + monitoring_addr, + "/api/v1/global", + timeout, + move |r: &ApiGlobalResponse| { + r.sv1_clients + .as_ref() + .is_some_and(|c| c.total_clients >= min) + }, + "ApiGlobalResponse sv1_clients.total_clients never reached >= expected", + ) + .await +} + +/// Poll `/api/v1/clients` until `total >= min`. +/// Returns the parsed `ApiPaginatedResponse` once satisfied. +pub async fn poll_until_clients_total_gte( + monitoring_addr: SocketAddr, + min: u64, + timeout: Duration, +) -> ApiPaginatedResponse { + poll_until( + monitoring_addr, + "/api/v1/clients", + timeout, + move |r: &ApiPaginatedResponse| r.total >= min, + "ApiPaginatedResponse total never reached >= expected", + ) + .await +} + +/// Poll `/api/v1/sv1/clients` until `total >= min`. +/// Returns the parsed `ApiPaginatedResponse` once satisfied. +pub async fn poll_until_sv1_clients_total_gte( + monitoring_addr: SocketAddr, + min: u64, + timeout: Duration, +) -> ApiPaginatedResponse { + poll_until( + monitoring_addr, + "/api/v1/sv1/clients", + timeout, + move |r: &ApiPaginatedResponse| r.total >= min, + "ApiPaginatedResponse total never reached >= expected", + ) + .await +} + +/// Poll `/api/v1/server` until `extended_channels_count >= min`. +/// Returns the parsed `ApiServerResponse` once satisfied. +pub async fn poll_until_server_channels_gte( + monitoring_addr: SocketAddr, + min: usize, + timeout: Duration, +) -> ApiServerResponse { + poll_until( + monitoring_addr, + "/api/v1/server", + timeout, + move |r: &ApiServerResponse| r.extended_channels_count >= min, + "ApiServerResponse extended_channels_count never reached >= expected", + ) + .await +} + /// A Prometheus metric selector: a metric name plus an optional set of label matchers. /// /// Label matching is order-independent — the selector matches any exposition line @@ -158,6 +467,7 @@ pub(crate) fn parse_metric_value<'a, M: Into>>( } /// Assert that a metric is present and its value satisfies the given predicate. +#[track_caller] pub(crate) fn assert_metric<'a, M, F>( metrics_text: &str, metric: M, @@ -187,12 +497,13 @@ pub(crate) fn assert_metric<'a, M, F>( } } -/// Assert that a metric is present with a value >= the given minimum. +#[track_caller] pub fn assert_metric_gte<'a, M: Into>>(metrics_text: &str, metric: M, min: f64) { assert_metric(metrics_text, metric, |v| v >= min, &format!(">= {}", min)); } /// Assert that a metric is present with the exact given value. +#[track_caller] pub fn assert_metric_eq<'a, M: Into>>(metrics_text: &str, metric: M, expected: f64) { assert_metric( metrics_text, @@ -207,6 +518,7 @@ pub fn assert_metric_eq<'a, M: Into>>(metrics_text: &str, metric: M, /// For a bare-name selector (`Metric::new("name")` or `"name".into()`), this means /// the metric name does not appear at all. For a labeled selector, it means no line /// with matching labels exists — other series for the same metric name are allowed. +#[track_caller] pub fn assert_metric_not_present<'a, M: Into>>(metrics_text: &str, metric: M) { let metric = metric.into(); for line in metrics_text.lines() { @@ -223,6 +535,7 @@ pub fn assert_metric_not_present<'a, M: Into>>(metrics_text: &str, me } /// Assert that at least one exposition line matches the selector. +#[track_caller] pub fn assert_metric_present<'a, M: Into>>(metrics_text: &str, metric: M) { let metric = metric.into(); for line in metrics_text.lines() { @@ -248,7 +561,7 @@ pub async fn poll_until_metric_gte<'a, M: Into>>( monitoring_addr: SocketAddr, metric: M, min: f64, - timeout: std::time::Duration, + timeout: Duration, ) -> String { let metric = metric.into(); let deadline = tokio::time::Instant::now() + timeout; @@ -265,21 +578,22 @@ pub async fn poll_until_metric_gte<'a, M: Into>>( metric, min, timeout, metrics ); } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } } -/// Assert that the `/api/v1/health` endpoint returns a response containing `"status":"ok"`. +/// Assert that the `/api/v1/health` endpoint returns `{"status":"ok"}`. pub async fn assert_api_health(monitoring_addr: SocketAddr) { - let body = fetch_api(monitoring_addr, "/api/v1/health").await; - assert!( - body.contains("\"status\":\"ok\""), + let health: serde_json::Value = fetch_api_typed(monitoring_addr, "/api/v1/health").await; + assert_eq!( + health["status"], "ok", "Health endpoint should return ok status, got: {}", - body + health ); } /// Assert that the uptime metric is present and positive. +#[track_caller] pub fn assert_uptime(metrics_text: &str) { assert_metric( metrics_text, diff --git a/integration-tests/lib/utils.rs b/integration-tests/lib/utils.rs index 6fc81d3e0..a2e95e26f 100644 --- a/integration-tests/lib/utils.rs +++ b/integration-tests/lib/utils.rs @@ -407,6 +407,43 @@ pub fn into_static(m: AnyMessage<'_>) -> AnyMessage<'static> { } pub mod http { + /// Make a GET request that returns both the HTTP status code and the response body. + /// Unlike `make_get_request`, this does NOT panic on non-2xx status codes (e.g. 404), + /// making it suitable for testing API error responses. + /// Only retries on 5xx errors or connection failures. + pub fn make_get_request_with_status(url: &str, retries: usize) -> (i32, Vec) { + for attempt in 1..=retries { + let response = minreq::get(url).send(); + match response { + Ok(res) => { + let status_code = res.status_code; + if (500..600).contains(&status_code) { + eprintln!( + "Attempt {attempt}: URL {url} returned a server error code {status_code}" + ); + } else { + return (status_code, res.as_bytes().to_vec()); + } + } + Err(err) => { + eprintln!( + "Attempt {}: Failed to fetch URL {}: {:?}", + attempt + 1, + url, + err + ); + } + } + + if attempt < retries { + let delay = 1u64 << (attempt - 1); + eprintln!("Retrying in {delay} seconds (exponential backoff)..."); + std::thread::sleep(std::time::Duration::from_secs(delay)); + } + } + panic!("Cannot reach URL {url} after {retries} attempts"); + } + pub fn make_get_request(download_url: &str, retries: usize) -> Vec { for attempt in 1..=retries { let response = minreq::get(download_url).send(); diff --git a/integration-tests/tests/monitoring_integration.rs b/integration-tests/tests/monitoring_integration.rs index 9936b6b00..d1ef9a05f 100644 --- a/integration-tests/tests/monitoring_integration.rs +++ b/integration-tests/tests/monitoring_integration.rs @@ -84,11 +84,7 @@ async fn pool_monitoring_with_sv2_mining_device() { shutdown_all!(pool); } -// --------------------------------------------------------------------------- -// 2. Pool + tProxy + SV1 miner (non-aggregated) Pool: client metrics (1 SV2 client = tProxy, -// extended channel, shares) tProxy: server metrics (upstream channel to pool), SV1 metrics (1 -// SV1 client) tProxy has no SV2 downstreams so sv2_clients_total should be absent -// --------------------------------------------------------------------------- +// Pool + tProxy + SV1 miner: Pool sees 1 SV2 client, tProxy sees 1 SV1 client and 1 upstream channel. #[tokio::test] async fn pool_and_tproxy_monitoring_with_sv1_miner() { start_tracing(); @@ -168,10 +164,7 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() { shutdown_all!(pool, tproxy); } -// --------------------------------------------------------------------------- -// 3. Pool + JDC + tProxy + 2 SV1 miners (aggregated) tProxy aggregated: 2 SV1 clients, 1 upstream -// extended channel Pool: 1 SV2 client (JDC), shares accepted -// --------------------------------------------------------------------------- +// Pool + JDC + tProxy + 2 SV1 miners: aggregated topology with multiple SV1 clients. #[tokio::test] async fn jd_aggregated_topology_monitoring() { start_tracing(); @@ -267,10 +260,7 @@ async fn jd_aggregated_topology_monitoring() { shutdown_all!(pool, jdc, tproxy); } -// --------------------------------------------------------------------------- -// 4. Block found detection via metrics Uses JDC topology (which finds regtest blocks). After a -// block is found, the pool's sv2_client_blocks_found_total metric should be >= 1. -// --------------------------------------------------------------------------- +// Block found detection: JDC topology finds regtest blocks, pool metrics reflect it. #[tokio::test] async fn block_found_detected_in_pool_metrics() { use stratum_apps::stratum_core::template_distribution_sv2::*; @@ -316,3 +306,417 @@ async fn block_found_detected_in_pool_metrics() { shutdown_all!(pool, jdc, tproxy); } + +// --------------------------------------------------------------------------- +// 5. Pool JSON API endpoints — static (no miner / no activity). +// Covers: root, /api/v1/server (404), /api/v1/server/channels (404), +// /api/v1/sv1/clients (404). +// --------------------------------------------------------------------------- +#[tokio::test] +async fn pool_api_endpoints_static() { + start_tracing(); + let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low); + let (pool, _pool_addr, pool_monitoring) = + start_pool(sv2_tp_config(tp_addr), vec![], vec![], true).await; + let pool_mon = pool_monitoring.expect("pool monitoring should be enabled"); + + // Health is always available. + assert_api_health(pool_mon).await; + + // Root endpoint lists APIs. + let root: ApiRootResponse = fetch_api_typed(pool_mon, "/").await; + assert_eq!(root.service, "SRI Monitoring API"); + assert!( + root.endpoints.is_object(), + "endpoints should be a JSON object" + ); + + // Pool has no upstream and no SV1 — these endpoints should return 404 with a JSON error body. + for path in [ + "/api/v1/server", + "/api/v1/server/channels", + "/api/v1/sv1/clients", + ] { + let (status, json) = fetch_api_with_status(pool_mon, path).await; + assert_eq!( + status, 404, + "{} should return 404, got {} with body {}", + path, status, json + ); + assert!( + json["error"].is_string(), + "{} should return a JSON error body, got {}", + path, + json + ); + } + + pool.shutdown().await; +} + +// --------------------------------------------------------------------------- +// 6. Pool JSON API endpoints — with active SV2 miner. +// Covers: /api/v1/global, /api/v1/clients (paginated list), /api/v1/clients/{id}, +// /api/v1/clients/{id}/channels, plus 404s for unknown ids. +// Also cross-validates that the JSON API and Prometheus surface report +// consistent share data for the same (client, channel, user). +// --------------------------------------------------------------------------- +#[tokio::test] +async fn pool_api_endpoints_with_miner() { + start_tracing(); + let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low); + let (pool, pool_addr, pool_monitoring) = + start_pool(sv2_tp_config(tp_addr), vec![], vec![], true).await; + let (sniffer, sniffer_addr) = start_sniffer("A", pool_addr, false, vec![], None); + // Explicit user_id so the per-channel Prometheus user_identity label is meaningful. + start_mining_device_sv2( + sniffer_addr, + None, + None, + Some("test-miner".to_string()), + 1, + None, + true, + ); + + sniffer + .wait_for_message_type( + MessageDirection::ToUpstream, + MESSAGE_TYPE_SUBMIT_SHARES_STANDARD, + ) + .await; + sniffer + .wait_for_message_type( + MessageDirection::ToDownstream, + MESSAGE_TYPE_SUBMIT_SHARES_SUCCESS, + ) + .await; + + let pool_mon = pool_monitoring.expect("pool monitoring should be enabled"); + + // /api/v1/global — pool sees SV2 clients, no upstream server, no SV1. + let global = poll_until_global_sv2_clients_gte(pool_mon, 1, METRIC_POLL_TIMEOUT).await; + assert!( + global.server.is_none(), + "Pool /api/v1/global should have null server" + ); + assert_eq!(global.sv2_clients.as_ref().unwrap().total_clients, 1); + assert!( + global.sv1_clients.is_none(), + "Pool /api/v1/global should have null sv1_clients" + ); + assert!(global.uptime_secs > 0); + + // /api/v1/clients — paginated list. + let clients = poll_until_clients_total_gte(pool_mon, 1, METRIC_POLL_TIMEOUT).await; + assert_eq!(clients.total, 1); + assert_eq!(clients.items.len(), 1); + let client_id = clients.items[0].client_id; + assert!(client_id > 0); + + // /api/v1/clients/{id} — single-client lookup. + let client: ApiClientMetadata = + fetch_api_typed(pool_mon, &format!("/api/v1/clients/{}", client_id)).await; + assert_eq!(client.client_id, client_id); + + // /api/v1/clients/{id}/channels — at least one channel. + let channels: ApiClientChannelsResponse = + fetch_api_typed(pool_mon, &format!("/api/v1/clients/{}/channels", client_id)).await; + assert_eq!(channels.client_id, client_id); + assert!( + channels.total_standard + channels.total_extended >= 1, + "client should have ≥1 channel, got std={} ext={}", + channels.total_standard, + channels.total_extended, + ); + + // 404 paths for unknown client id. + for path in [ + format!("/api/v1/clients/{}", 99999), + format!("/api/v1/clients/{}/channels", 99999), + ] { + let (status, _) = fetch_api_with_status(pool_mon, &path).await; + assert_eq!(status, 404, "unknown {} should return 404", path); + } + + // Cross-surface: Prometheus must report at least the same accepted shares the JSON API saw. + // Pool reserves channel_id=1 internally and assigns 2 to the first downstream-opened channel. + let metrics = poll_until_metric_gte( + pool_mon, + Metric::with_labels( + "sv2_client_shares_accepted_total", + &[ + ("client_id", "1"), + ("channel_id", "2"), + ("user_identity", "test-miner"), + ], + ), + 1.0, + METRIC_POLL_TIMEOUT, + ) + .await; + assert_metric_eq(&metrics, "sv2_clients_total", 1.0); + + pool.shutdown().await; +} + +// --------------------------------------------------------------------------- +// 7. tProxy JSON API endpoints — static (no miner / no activity). +// Covers: root, /api/v1/clients (404 — tProxy has no SV2 downstreams). +// --------------------------------------------------------------------------- +#[tokio::test] +async fn tproxy_api_endpoints_static() { + start_tracing(); + let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low); + let (pool, pool_addr, _pool_monitoring) = + start_pool(sv2_tp_config(tp_addr), vec![], vec![], false).await; + let (tproxy, _tproxy_addr, tproxy_monitoring) = + start_sv2_translator(&[pool_addr], false, vec![], vec![], None, true).await; + let tproxy_mon = tproxy_monitoring.expect("tproxy monitoring should be enabled"); + + assert_api_health(tproxy_mon).await; + + let root: ApiRootResponse = fetch_api_typed(tproxy_mon, "/").await; + assert_eq!(root.service, "SRI Monitoring API"); + assert!(root.endpoints.is_object()); + + let (status, json) = fetch_api_with_status(tproxy_mon, "/api/v1/clients").await; + assert_eq!(status, 404, "/api/v1/clients should return 404 for tProxy"); + assert!(json["error"].is_string()); + + shutdown_all!(tproxy, pool); +} + +// --------------------------------------------------------------------------- +// 8. tProxy JSON API endpoints — with active SV1 miner. +// Covers: /api/v1/global, /api/v1/server, /api/v1/server/channels, +// /api/v1/sv1/clients (paginated), /api/v1/sv1/clients/{id} (+ 404). +// Also cross-validates that JSON API and Prometheus expose consistent +// upstream-channel share data. +// --------------------------------------------------------------------------- +#[tokio::test] +async fn tproxy_api_endpoints_with_miner() { + start_tracing(); + let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low); + let (pool, pool_addr, _pool_monitoring) = + start_pool(sv2_tp_config(tp_addr), vec![], vec![], false).await; + let (sniffer, sniffer_addr) = start_sniffer("0", pool_addr, false, vec![], None); + let (tproxy, tproxy_addr, tproxy_monitoring) = + start_sv2_translator(&[sniffer_addr], false, vec![], vec![], None, true).await; + let (_minerd_process, _minerd_addr) = start_minerd(tproxy_addr, None, None, false).await; + + sniffer + .wait_for_message_type( + MessageDirection::ToUpstream, + MESSAGE_TYPE_SUBMIT_SHARES_EXTENDED, + ) + .await; + + let tproxy_mon = tproxy_monitoring.expect("tproxy monitoring should be enabled"); + + // /api/v1/global — tProxy has upstream server + SV1 clients, no SV2 downstreams. + let global = poll_until_global_sv1_clients_gte(tproxy_mon, 1, METRIC_POLL_TIMEOUT).await; + let server_summary = global + .server + .as_ref() + .expect("tProxy /api/v1/global should have server"); + assert!(server_summary.extended_channels >= 1); + assert!(global.sv1_clients.as_ref().unwrap().total_clients >= 1); + assert!( + global.sv2_clients.is_none(), + "tProxy should have null sv2_clients" + ); + + // /api/v1/server — extended channel count visible. + let server = poll_until_server_channels_gte(tproxy_mon, 1, METRIC_POLL_TIMEOUT).await; + assert!(server.extended_channels_count >= 1); + + // /api/v1/server/channels — channel details with shares_acknowledged populated. + let server_channels = poll_until_api_field_gte( + tproxy_mon, + "/api/v1/server/channels", + "/total_extended", + 1.0, + METRIC_POLL_TIMEOUT, + ) + .await; + let ext = server_channels["extended_channels"] + .as_array() + .expect("extended_channels should be an array"); + assert!(!ext.is_empty(), "should have channel details"); + assert!( + ext[0]["shares_acknowledged"].as_u64().is_some(), + "channel should expose shares_acknowledged, got: {}", + ext[0] + ); + // Regression guard: shares_rejected fields must be present with the + // expected types on the upstream channel detail. + assert!( + ext[0]["shares_rejected"].as_u64().is_some(), + "channel should expose shares_rejected as a number, got: {}", + ext[0] + ); + assert!( + ext[0]["shares_rejected_by_reason"].is_object(), + "channel should expose shares_rejected_by_reason as an object, got: {}", + ext[0] + ); + + // /api/v1/sv1/clients — at least one SV1 client. + let sv1_clients = poll_until_sv1_clients_total_gte(tproxy_mon, 1, METRIC_POLL_TIMEOUT).await; + assert!(sv1_clients.total >= 1); + assert!(!sv1_clients.items.is_empty()); + let sv1_client_id = sv1_clients.items[0].client_id; + assert!(sv1_client_id > 0); + + // /api/v1/sv1/clients/{id} — single-client lookup. + let client: ApiSv1Client = fetch_api_typed( + tproxy_mon, + &format!("/api/v1/sv1/clients/{}", sv1_client_id), + ) + .await; + assert_eq!(client.client_id, sv1_client_id); + + // 404 for unknown SV1 client id. + let (status, _) = fetch_api_with_status(tproxy_mon, "/api/v1/sv1/clients/99999").await; + assert_eq!(status, 404); + + // Cross-surface: Prometheus must report the same upstream-channel accepted shares. + let metrics = poll_until_metric_gte( + tproxy_mon, + Metric::with_labels( + "sv2_server_shares_accepted_total", + &[ + ("channel_id", "2"), + ("user_identity", "user_identity.miner1"), + ], + ), + 1.0, + METRIC_POLL_TIMEOUT, + ) + .await; + assert_metric_eq(&metrics, "sv1_clients_total", 1.0); + + shutdown_all!(tproxy, pool); +} + +// --------------------------------------------------------------------------- +// 9. JDC JSON API endpoints — with active SV1 miner. +// JDC sits between pool and tProxy: it is an SV2 client to the pool (so +// `server` is populated) and an SV2 server to tProxy (so `sv2_clients` is +// populated). JDC has no SV1 surface. +// Covers: root, /api/v1/global, /api/v1/server, /api/v1/server/channels, +// /api/v1/clients, /api/v1/clients/{id}, /api/v1/clients/{id}/channels, +// /api/v1/sv1/clients (404). +// --------------------------------------------------------------------------- +#[tokio::test] +async fn jdc_api_endpoints_with_miner() { + start_tracing(); + let (tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low); + let (pool, pool_addr, jds_addr, _pool_monitoring) = + start_pool_with_jds(tp.bitcoin_core(), vec![], vec![], false).await; + let (jdc_pool_sniffer, jdc_pool_sniffer_addr) = + start_sniffer("0", pool_addr, false, vec![], None); + let (jdc, jdc_addr, jdc_monitoring) = start_jdc( + &[(jdc_pool_sniffer_addr, jds_addr)], + sv2_tp_config(tp_addr), + vec![], + vec![], + true, + None, + ); + let (tproxy, tproxy_addr, _) = + start_sv2_translator(&[jdc_addr], true, vec![], vec![], None, false).await; + let (_minerd, _minerd_addr) = start_minerd(tproxy_addr, None, None, false).await; + + // Wait until at least one share has flowed all the way to the pool. + jdc_pool_sniffer + .wait_for_message_type( + MessageDirection::ToUpstream, + MESSAGE_TYPE_SUBMIT_SHARES_EXTENDED, + ) + .await; + + let jdc_mon = jdc_monitoring.expect("jdc monitoring should be enabled"); + assert_api_health(jdc_mon).await; + + // Root endpoint lists APIs. + let root: ApiRootResponse = fetch_api_typed(jdc_mon, "/").await; + assert_eq!(root.service, "SRI Monitoring API"); + assert!(root.endpoints.is_object()); + + // /api/v1/global — JDC has both server (pool) and sv2_clients (tproxy), no sv1. + let global = poll_until_global_sv2_clients_gte(jdc_mon, 1, METRIC_POLL_TIMEOUT).await; + let server_summary = global + .server + .as_ref() + .expect("JDC /api/v1/global should have server"); + assert!(server_summary.extended_channels >= 1); + assert_eq!(global.sv2_clients.as_ref().unwrap().total_clients, 1); + assert!( + global.sv1_clients.is_none(), + "JDC should have null sv1_clients" + ); + assert!(global.uptime_secs > 0); + + // /api/v1/server — extended channel from upstream pool. + let server = poll_until_server_channels_gte(jdc_mon, 1, METRIC_POLL_TIMEOUT).await; + assert!(server.extended_channels_count >= 1); + + // /api/v1/server/channels — channel details with all share-counter fields. + let server_channels = poll_until_api_field_gte( + jdc_mon, + "/api/v1/server/channels", + "/total_extended", + 1.0, + METRIC_POLL_TIMEOUT, + ) + .await; + let ext = server_channels["extended_channels"] + .as_array() + .expect("extended_channels should be an array"); + assert!(!ext.is_empty()); + assert!( + ext[0]["shares_acknowledged"].as_u64().is_some(), + "channel should expose shares_acknowledged, got: {}", + ext[0] + ); + assert!( + ext[0]["shares_rejected"].as_u64().is_some(), + "channel should expose shares_rejected as a number, got: {}", + ext[0] + ); + assert!( + ext[0]["shares_rejected_by_reason"].is_object(), + "channel should expose shares_rejected_by_reason as an object, got: {}", + ext[0] + ); + + // /api/v1/clients — JDC's downstream is the tProxy (single SV2 client). + let clients = poll_until_clients_total_gte(jdc_mon, 1, METRIC_POLL_TIMEOUT).await; + assert_eq!(clients.total, 1); + assert_eq!(clients.items.len(), 1); + let client_id = clients.items[0].client_id; + + // /api/v1/clients/{id} — single client lookup. + let client: ApiClientMetadata = + fetch_api_typed(jdc_mon, &format!("/api/v1/clients/{}", client_id)).await; + assert_eq!(client.client_id, client_id); + + // /api/v1/clients/{id}/channels — tProxy opens an extended channel via JDC. + let channels: ApiClientChannelsResponse = + fetch_api_typed(jdc_mon, &format!("/api/v1/clients/{}/channels", client_id)).await; + assert_eq!(channels.client_id, client_id); + assert!( + channels.total_extended >= 1, + "tproxy should open an extended channel via JDC, got total_extended={}", + channels.total_extended, + ); + + // /api/v1/sv1/clients — JDC has no SV1 surface. + let (status, json) = fetch_api_with_status(jdc_mon, "/api/v1/sv1/clients").await; + assert_eq!(status, 404, "JDC /api/v1/sv1/clients should return 404"); + assert!(json["error"].is_string()); + + shutdown_all!(tproxy, jdc, pool); +} diff --git a/stratum-apps/src/monitoring/http_server.rs b/stratum-apps/src/monitoring/http_server.rs index 67192e675..20eb6b518 100644 --- a/stratum-apps/src/monitoring/http_server.rs +++ b/stratum-apps/src/monitoring/http_server.rs @@ -784,18 +784,117 @@ async fn handle_prometheus_metrics(State(state): State) -> Response #[cfg(test)] mod tests { use super::*; + use crate::monitoring::server::ServerInfo; use axum::body::Body; use http_body_util::BodyExt; use std::{collections::HashMap, sync::Mutex}; use tower::ServiceExt; + // ── Route constants ───────────────────────────────────────────── + mod routes { + pub const ROOT: &str = "/"; + pub const HEALTH: &str = "/api/v1/health"; + pub const GLOBAL: &str = "/api/v1/global"; + pub const SERVER: &str = "/api/v1/server"; + pub const SERVER_CHANNELS: &str = "/api/v1/server/channels"; + pub const CLIENTS: &str = "/api/v1/clients"; + pub const SV1_CLIENTS: &str = "/api/v1/sv1/clients"; + pub const METRICS: &str = "/metrics"; + + pub fn client_by_id(id: u64) -> String { + format!("/api/v1/clients/{}", id) + } + + pub fn client_channels(id: u64) -> String { + format!("/api/v1/clients/{}/channels", id) + } + + pub fn sv1_client_by_id(id: u64) -> String { + format!("/api/v1/sv1/clients/{}", id) + } + } + + // ── Response types for JSON parsing ───────────────────────────── + #[derive(Debug, serde::Deserialize)] + struct ErrorResponseBody { + error: String, + } + + #[derive(Debug, serde::Deserialize)] + struct PaginatedResponse { + total: usize, + offset: usize, + limit: usize, + items: Vec, + } + + #[derive(Debug, serde::Deserialize)] + struct HealthResponseBody { + status: String, + } + + #[derive(Debug, serde::Deserialize)] + struct RootResponseBody { + service: String, + endpoints: serde_json::Value, + } + + #[derive(Debug, serde::Deserialize)] + struct GlobalResponseBody { + uptime_secs: u64, + server: Option, + sv2_clients: Option, + sv1_clients: Option, + } + + #[derive(Debug, serde::Deserialize)] + struct ServerSummaryBody { + extended_channels: u64, + } + + #[derive(Debug, serde::Deserialize)] + struct Sv2ClientsSummaryBody { + total_clients: u64, + } + + #[derive(Debug, serde::Deserialize)] + struct Sv1ClientsSummaryBody { + total_clients: u64, + } + + #[derive(Debug, serde::Deserialize)] + struct ServerResponseBody { + extended_channels_count: usize, + standard_channels_count: usize, + } + + #[derive(Debug, serde::Deserialize)] + struct ServerChannelsResponseBody { + offset: usize, + limit: usize, + total_extended: usize, + extended_channels: Vec, + } + + #[derive(Debug, serde::Deserialize)] + struct ClientMetadataBody { + client_id: u64, + } + + #[derive(Debug, serde::Deserialize)] + struct ClientChannelsResponseBody { + client_id: u64, + } + + #[derive(Debug, serde::Deserialize)] + struct Sv1ClientBody { + client_id: u64, + } + // ── helpers ────────────────────────────────────────────────────── - fn create_extended_channel_info( - channel_id: u32, - hashrate: f32, - ) -> super::super::client::ExtendedChannelInfo { - super::super::client::ExtendedChannelInfo { + fn create_extended_channel_info(channel_id: u32, hashrate: f32) -> ExtendedChannelInfo { + ExtendedChannelInfo { channel_id, user_identity: format!("user-ext-{}", channel_id), nominal_hashrate: hashrate, @@ -816,11 +915,8 @@ mod tests { } } - fn create_standard_channel_info( - channel_id: u32, - hashrate: f32, - ) -> super::super::client::StandardChannelInfo { - super::super::client::StandardChannelInfo { + fn create_standard_channel_info(channel_id: u32, hashrate: f32) -> StandardChannelInfo { + StandardChannelInfo { channel_id, user_identity: format!("user-std-{}", channel_id), nominal_hashrate: hashrate, @@ -897,22 +993,22 @@ mod tests { } } - struct MockServer(super::super::server::ServerInfo); + struct MockServer(ServerInfo); impl ServerMonitoring for MockServer { - fn get_server(&self) -> super::super::server::ServerInfo { + fn get_server(&self) -> ServerInfo { self.0.clone() } } struct MockClients(Vec); - impl super::super::client::Sv2ClientsMonitoring for MockClients { + impl Sv2ClientsMonitoring for MockClients { fn get_sv2_clients(&self) -> Vec { self.0.clone() } } struct MockSv1Clients(Vec); - impl super::super::sv1::Sv1ClientsMonitoring for MockSv1Clients { + impl Sv1ClientsMonitoring for MockSv1Clients { fn get_sv1_clients(&self) -> Vec { self.0.clone() } @@ -921,8 +1017,8 @@ mod tests { /// Build a full Router with mock data for integration testing. fn build_test_app( server: Option>, - clients: Option>, - sv1: Option>, + clients: Option>, + sv1: Option>, ) -> Router { let has_server = server.is_some(); let has_clients = clients.is_some(); @@ -1071,43 +1167,43 @@ mod tests { #[tokio::test] async fn health_endpoint_returns_ok() { let app = build_test_app(None, None, None); - let response = app.oneshot(make_request("/api/v1/health")).await.unwrap(); + let response = app.oneshot(make_request(routes::HEALTH)).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["status"], "ok"); - assert!(json["timestamp"].as_u64().is_some()); + let resp: HealthResponseBody = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.status, "ok"); } #[tokio::test] async fn root_endpoint_lists_endpoints() { let app = build_test_app(None, None, None); - let response = app.oneshot(make_request("/")).await.unwrap(); + let response = app.oneshot(make_request(routes::ROOT)).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["service"], "SRI Monitoring API"); - assert!(json["endpoints"].is_object()); + let resp: RootResponseBody = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.service, "SRI Monitoring API"); + assert!(resp.endpoints.is_object()); } #[tokio::test] async fn global_endpoint_with_no_sources() { let app = build_test_app(None, None, None); - let response = app.oneshot(make_request("/api/v1/global")).await.unwrap(); + let response = app.oneshot(make_request(routes::GLOBAL)).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert!(json["server"].is_null()); - assert!(json["sv2_clients"].is_null()); - assert!(json["uptime_secs"].as_u64().is_some()); + let resp: GlobalResponseBody = serde_json::from_str(&body).unwrap(); + assert!(resp.server.is_none()); + assert!(resp.sv2_clients.is_none()); + // uptime_secs can be 0 if test runs fast enough, just verify it parses + let _ = resp.uptime_secs; } #[tokio::test] async fn global_endpoint_with_data() { - let server = Arc::new(MockServer(super::super::server::ServerInfo { + let server = Arc::new(MockServer(ServerInfo { extended_channels: vec![create_server_extended_channel_info(1, Some(100.0))], standard_channels: vec![], })); @@ -1119,28 +1215,28 @@ mod tests { let app = build_test_app( Some(server as Arc), - Some(clients as Arc), + Some(clients as Arc), None, ); - let response = app.oneshot(make_request("/api/v1/global")).await.unwrap(); + let response = app.oneshot(make_request(routes::GLOBAL)).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["server"]["extended_channels"], 1); - assert_eq!(json["sv2_clients"]["total_clients"], 1); + let resp: GlobalResponseBody = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.server.as_ref().unwrap().extended_channels, 1); + assert_eq!(resp.sv2_clients.as_ref().unwrap().total_clients, 1); } #[tokio::test] async fn server_endpoint_not_available() { let app = build_test_app(None, None, None); - let response = app.oneshot(make_request("/api/v1/server")).await.unwrap(); + let response = app.oneshot(make_request(routes::SERVER)).await.unwrap(); assert_eq!(response.status(), StatusCode::NOT_FOUND); } #[tokio::test] async fn server_endpoint_with_data() { - let server = Arc::new(MockServer(super::super::server::ServerInfo { + let server = Arc::new(MockServer(ServerInfo { extended_channels: vec![create_server_extended_channel_info(1, Some(100.0))], standard_channels: vec![create_server_standard_channel_info(2, Some(50.0))], })); @@ -1150,18 +1246,18 @@ mod tests { None, None, ); - let response = app.oneshot(make_request("/api/v1/server")).await.unwrap(); + let response = app.oneshot(make_request(routes::SERVER)).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["extended_channels_count"], 1); - assert_eq!(json["standard_channels_count"], 1); + let resp: ServerResponseBody = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.extended_channels_count, 1); + assert_eq!(resp.standard_channels_count, 1); } #[tokio::test] async fn server_channels_endpoint_with_pagination() { - let server = Arc::new(MockServer(super::super::server::ServerInfo { + let server = Arc::new(MockServer(ServerInfo { extended_channels: vec![ create_server_extended_channel_info(1, Some(100.0)), create_server_extended_channel_info(2, Some(200.0)), @@ -1176,17 +1272,20 @@ mod tests { None, ); let response = app - .oneshot(make_request("/api/v1/server/channels?offset=1&limit=1")) + .oneshot(make_request(&format!( + "{}?offset=1&limit=1", + routes::SERVER_CHANNELS + ))) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["total_extended"], 3); - assert_eq!(json["offset"], 1); - assert_eq!(json["limit"], 1); - assert_eq!(json["extended_channels"].as_array().unwrap().len(), 1); + let resp: ServerChannelsResponseBody = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.total_extended, 3); + assert_eq!(resp.offset, 1); + assert_eq!(resp.limit, 1); + assert_eq!(resp.extended_channels.len(), 1); } #[tokio::test] @@ -1218,7 +1317,7 @@ mod tests { #[tokio::test] async fn clients_endpoint_not_available() { let app = build_test_app(None, None, None); - let response = app.oneshot(make_request("/api/v1/clients")).await.unwrap(); + let response = app.oneshot(make_request(routes::CLIENTS)).await.unwrap(); assert_eq!(response.status(), StatusCode::NOT_FOUND); } @@ -1239,17 +1338,17 @@ mod tests { let app = build_test_app( None, - Some(clients as Arc), + Some(clients as Arc), None, ); - let response = app.oneshot(make_request("/api/v1/clients")).await.unwrap(); + let response = app.oneshot(make_request(routes::CLIENTS)).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["total"], 2); - assert_eq!(json["items"].as_array().unwrap().len(), 2); - assert_eq!(json["items"][0]["client_id"], 1); + let resp: PaginatedResponse = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.total, 2); + assert_eq!(resp.items.len(), 2); + assert_eq!(resp.items[0].client_id, 1); } #[tokio::test] @@ -1262,20 +1361,18 @@ mod tests { let app = build_test_app( None, - Some(clients as Arc), + Some(clients as Arc), None, ); let response = app - .oneshot(make_request("/api/v1/clients/42")) + .oneshot(make_request(&routes::client_by_id(42))) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["client_id"], 42); - assert_eq!(json["extended_channels_count"], 1); - assert_eq!(json["standard_channels_count"], 1); + let resp: ClientMetadataBody = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.client_id, 42); } #[tokio::test] @@ -1288,11 +1385,11 @@ mod tests { let app = build_test_app( None, - Some(clients as Arc), + Some(clients as Arc), None, ); let response = app - .oneshot(make_request("/api/v1/clients/999")) + .oneshot(make_request(&routes::client_by_id(999))) .await .unwrap(); assert_eq!(response.status(), StatusCode::NOT_FOUND); @@ -1312,27 +1409,28 @@ mod tests { let app = build_test_app( None, - Some(clients as Arc), + Some(clients as Arc), None, ); let response = app - .oneshot(make_request("/api/v1/clients/1/channels?offset=1&limit=2")) + .oneshot(make_request(&format!( + "{}?offset=1&limit=2", + routes::client_channels(1) + ))) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["client_id"], 1); - assert_eq!(json["total_extended"], 3); - assert_eq!(json["extended_channels"].as_array().unwrap().len(), 2); + let resp: ClientChannelsResponseBody = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.client_id, 1); } #[tokio::test] async fn sv1_clients_not_available() { let app = build_test_app(None, None, None); let response = app - .oneshot(make_request("/api/v1/sv1/clients")) + .oneshot(make_request(routes::SV1_CLIENTS)) .await .unwrap(); assert_eq!(response.status(), StatusCode::NOT_FOUND); @@ -1348,18 +1446,18 @@ mod tests { let app = build_test_app( None, None, - Some(sv1 as Arc), + Some(sv1 as Arc), ); let response = app - .oneshot(make_request("/api/v1/sv1/clients")) + .oneshot(make_request(routes::SV1_CLIENTS)) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["total"], 2); - assert_eq!(json["items"].as_array().unwrap().len(), 2); + let resp: PaginatedResponse = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.total, 2); + assert_eq!(resp.items.len(), 2); } #[tokio::test] @@ -1369,17 +1467,17 @@ mod tests { let app = build_test_app( None, None, - Some(sv1 as Arc), + Some(sv1 as Arc), ); let response = app - .oneshot(make_request("/api/v1/sv1/clients/7")) + .oneshot(make_request(&routes::sv1_client_by_id(7))) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - assert_eq!(json["client_id"], 7); + let resp: Sv1ClientBody = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.client_id, 7); } #[tokio::test] @@ -1389,10 +1487,10 @@ mod tests { let app = build_test_app( None, None, - Some(sv1 as Arc), + Some(sv1 as Arc), ); let response = app - .oneshot(make_request("/api/v1/sv1/clients/999")) + .oneshot(make_request(&routes::sv1_client_by_id(999))) .await .unwrap(); assert_eq!(response.status(), StatusCode::NOT_FOUND); @@ -1400,7 +1498,7 @@ mod tests { #[tokio::test] async fn metrics_endpoint_returns_prometheus_format() { - let server = Arc::new(MockServer(super::super::server::ServerInfo { + let server = Arc::new(MockServer(ServerInfo { extended_channels: vec![create_server_extended_channel_info(1, Some(100.0))], standard_channels: vec![], })); @@ -1412,10 +1510,10 @@ mod tests { let app = build_test_app( Some(server as Arc), - Some(clients as Arc), + Some(clients as Arc), None, ); - let response = app.oneshot(make_request("/metrics")).await.unwrap(); + let response = app.oneshot(make_request(routes::METRICS)).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; @@ -1427,7 +1525,7 @@ mod tests { #[tokio::test] async fn metrics_endpoint_with_no_sources() { let app = build_test_app(None, None, None); - let response = app.oneshot(make_request("/metrics")).await.unwrap(); + let response = app.oneshot(make_request(routes::METRICS)).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = get_body(response).await; @@ -1523,4 +1621,193 @@ mod tests { "Channel 2 should be removed as stale" ); } + + // ── Edge-case unit tests (pagination, missing data, invalid params) ── + + #[test] + fn paginate_with_limit_zero() { + // effective_limit(Some(0)) = 0.min(MAX_LIMIT) = 0, so take(0) returns nothing + let items: Vec = (0..50).collect(); + let params = Pagination { + offset: 0, + limit: Some(0), + }; + let (total, result) = paginate(&items, ¶ms); + assert_eq!(total, 50); + assert!(result.is_empty(), "limit=0 should return no items"); + } + + #[tokio::test] + async fn server_channels_not_available() { + let app = build_test_app(None, None, None); + let response = app + .oneshot(make_request(routes::SERVER_CHANNELS)) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + let body = get_body(response).await; + let resp: ErrorResponseBody = serde_json::from_str(&body).unwrap(); + assert!(!resp.error.is_empty()); + } + + #[tokio::test] + async fn client_by_id_no_monitoring() { + // When client monitoring is not available at all, any client_id returns 404 + let app = build_test_app(None, None, None); + let response = app + .oneshot(make_request(&routes::client_by_id(1))) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + let body = get_body(response).await; + let resp: ErrorResponseBody = serde_json::from_str(&body).unwrap(); + assert!(!resp.error.is_empty()); + } + + #[tokio::test] + async fn client_channels_client_not_found() { + // Client monitoring is available but the specific client_id does not exist + let clients = Arc::new(MockClients(vec![Sv2ClientInfo { + client_id: 1, + extended_channels: vec![], + standard_channels: vec![], + }])); + + let app = build_test_app( + None, + Some(clients as Arc), + None, + ); + let response = app + .oneshot(make_request(&routes::client_channels(999))) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + let body = get_body(response).await; + let resp: ErrorResponseBody = serde_json::from_str(&body).unwrap(); + assert!(resp.error.contains("999")); + } + + #[tokio::test] + async fn client_channels_no_monitoring() { + // When client monitoring is not available at all + let app = build_test_app(None, None, None); + let response = app + .oneshot(make_request(&routes::client_channels(1))) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn sv1_client_by_id_no_monitoring() { + // When SV1 monitoring is not available at all + let app = build_test_app(None, None, None); + let response = app + .oneshot(make_request(&routes::sv1_client_by_id(1))) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + let body = get_body(response).await; + let resp: ErrorResponseBody = serde_json::from_str(&body).unwrap(); + assert!(!resp.error.is_empty()); + } + + #[tokio::test] + async fn clients_pagination_offset_and_limit() { + let clients = Arc::new(MockClients(vec![ + Sv2ClientInfo { + client_id: 1, + extended_channels: vec![create_extended_channel_info(1, 100.0)], + standard_channels: vec![], + }, + Sv2ClientInfo { + client_id: 2, + extended_channels: vec![], + standard_channels: vec![create_standard_channel_info(1, 50.0)], + }, + Sv2ClientInfo { + client_id: 3, + extended_channels: vec![create_extended_channel_info(2, 200.0)], + standard_channels: vec![], + }, + ])); + + let app = build_test_app( + None, + Some(clients as Arc), + None, + ); + let response = app + .oneshot(make_request(&format!( + "{}?offset=1&limit=1", + routes::CLIENTS + ))) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let body = get_body(response).await; + let resp: PaginatedResponse = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.total, 3); + assert_eq!(resp.offset, 1); + assert_eq!(resp.limit, 1); + assert_eq!(resp.items.len(), 1); + assert_eq!(resp.items[0].client_id, 2); + } + + #[tokio::test] + async fn sv1_clients_pagination() { + let sv1 = Arc::new(MockSv1Clients(vec![ + create_sv1_client_info(1, Some(100.0)), + create_sv1_client_info(2, Some(200.0)), + create_sv1_client_info(3, Some(300.0)), + ])); + + let app = build_test_app( + None, + None, + Some(sv1 as Arc), + ); + let response = app + .oneshot(make_request(&format!( + "{}?offset=2&limit=10", + routes::SV1_CLIENTS + ))) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let body = get_body(response).await; + let resp: PaginatedResponse = serde_json::from_str(&body).unwrap(); + assert_eq!(resp.total, 3); + assert_eq!(resp.offset, 2); + assert_eq!(resp.items.len(), 1); + assert_eq!(resp.items[0].client_id, 3); + } + + #[tokio::test] + async fn global_endpoint_with_sv1_data() { + let sv1 = Arc::new(MockSv1Clients(vec![create_sv1_client_info(1, Some(100.0))])); + + let app = build_test_app( + None, + None, + Some(sv1 as Arc), + ); + let response = app.oneshot(make_request(routes::GLOBAL)).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let body = get_body(response).await; + let resp: GlobalResponseBody = serde_json::from_str(&body).unwrap(); + // Server and SV2 clients should be None + assert!(resp.server.is_none()); + assert!(resp.sv2_clients.is_none()); + // SV1 clients should be present + assert_eq!(resp.sv1_clients.as_ref().unwrap().total_clients, 1); + } }