From 81b6f7a39a2be6727f0bc2eba8d9961d0e715de6 Mon Sep 17 00:00:00 2001 From: Eric Price Date: Tue, 17 Mar 2026 19:27:14 -0400 Subject: [PATCH 1/2] fix(monitoring): move metric updates from /metrics handler into SnapshotCache::refresh (#337) Move all Prometheus gauge updates (set + stale-label removal) out of the /metrics HTTP handler and into SnapshotCache::refresh(), which runs as a periodic background task. This eliminates the GaugeVec reset gap where label series momentarily disappeared on every scrape. Changes: - SnapshotCache now owns PrometheusMetrics and PreviousLabelSets - refresh() updates snapshot data AND Prometheus gauges atomically - /metrics handler reduced to: set uptime gauge, gather, encode - ServerState simplified (no more PreviousLabelSets or Mutex) - Tests updated to wire metrics through cache via with_metrics() - Integration tests: replace fixed-sleep assertions with poll_until_metric_gte (100ms poll, 5s deadline) for CI resilience - Clone impl preserves previous_labels for correct stale-label detection - debug-level tracing on stale label removal errors - debug_assert on with_metrics double-attachment Closes #337 --- miner-apps/jd-client/src/lib/monitoring.rs | 2 + pool-apps/pool/src/lib/monitoring.rs | 2 + stratum-apps/src/monitoring/client.rs | 4 + stratum-apps/src/monitoring/http_server.rs | 4 +- .../src/monitoring/prometheus_metrics.rs | 46 +++++++- stratum-apps/src/monitoring/snapshot_cache.rs | 103 ++++++++++++++---- 6 files changed, 136 insertions(+), 25 deletions(-) diff --git a/miner-apps/jd-client/src/lib/monitoring.rs b/miner-apps/jd-client/src/lib/monitoring.rs index d0a665e48..76b28b0dc 100644 --- a/miner-apps/jd-client/src/lib/monitoring.rs +++ b/miner-apps/jd-client/src/lib/monitoring.rs @@ -87,6 +87,7 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { rollable_extranonce_size: extended_channel.get_rollable_extranonce_size(), expected_shares_per_minute: extended_channel.get_shares_per_minute(), shares_accepted: share_accounting.get_shares_accepted(), + shares_rejected: 0, share_work_sum: share_accounting.get_share_work_sum(), last_share_sequence_number: share_accounting.get_last_share_sequence_number(), best_diff: share_accounting.get_best_diff(), @@ -113,6 +114,7 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { extranonce_prefix_hex: hex::encode(standard_channel.get_extranonce_prefix()), expected_shares_per_minute: standard_channel.get_shares_per_minute(), shares_accepted: share_accounting.get_shares_accepted(), + shares_rejected: 0, share_work_sum: share_accounting.get_share_work_sum(), last_share_sequence_number: share_accounting.get_last_share_sequence_number(), best_diff: share_accounting.get_best_diff(), diff --git a/pool-apps/pool/src/lib/monitoring.rs b/pool-apps/pool/src/lib/monitoring.rs index 48be56dca..35baf4868 100644 --- a/pool-apps/pool/src/lib/monitoring.rs +++ b/pool-apps/pool/src/lib/monitoring.rs @@ -36,6 +36,7 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { rollable_extranonce_size: extended_channel.get_rollable_extranonce_size(), expected_shares_per_minute: extended_channel.get_shares_per_minute(), shares_accepted: share_accounting.get_shares_accepted(), + shares_rejected: 0, share_work_sum: share_accounting.get_share_work_sum(), last_share_sequence_number: share_accounting.get_last_share_sequence_number(), best_diff: share_accounting.get_best_diff(), @@ -62,6 +63,7 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { extranonce_prefix_hex: hex::encode(standard_channel.get_extranonce_prefix()), expected_shares_per_minute: standard_channel.get_shares_per_minute(), shares_accepted: share_accounting.get_shares_accepted(), + shares_rejected: 0, share_work_sum: share_accounting.get_share_work_sum(), last_share_sequence_number: share_accounting.get_last_share_sequence_number(), best_diff: share_accounting.get_best_diff(), diff --git a/stratum-apps/src/monitoring/client.rs b/stratum-apps/src/monitoring/client.rs index 37b20d2db..160b2add2 100644 --- a/stratum-apps/src/monitoring/client.rs +++ b/stratum-apps/src/monitoring/client.rs @@ -19,6 +19,7 @@ pub struct ExtendedChannelInfo { pub rollable_extranonce_size: u16, pub expected_shares_per_minute: f32, pub shares_accepted: u32, + pub shares_rejected: u32, pub share_work_sum: f64, pub last_share_sequence_number: u32, pub best_diff: f64, @@ -39,6 +40,7 @@ pub struct StandardChannelInfo { pub extranonce_prefix_hex: String, pub expected_shares_per_minute: f32, pub shares_accepted: u32, + pub shares_rejected: u32, pub share_work_sum: f64, pub last_share_sequence_number: u32, pub best_diff: f64, @@ -154,6 +156,7 @@ mod tests { rollable_extranonce_size: 4, expected_shares_per_minute: 1.0, shares_accepted: 10, + shares_rejected: 0, share_work_sum: 100.0, last_share_sequence_number: 5, best_diff: 50.0, @@ -174,6 +177,7 @@ mod tests { extranonce_prefix_hex: "bb".into(), expected_shares_per_minute: 2.0, shares_accepted: 20, + shares_rejected: 0, share_work_sum: 200.0, last_share_sequence_number: 8, best_diff: 80.0, diff --git a/stratum-apps/src/monitoring/http_server.rs b/stratum-apps/src/monitoring/http_server.rs index 3a31274c5..5f5a416e0 100644 --- a/stratum-apps/src/monitoring/http_server.rs +++ b/stratum-apps/src/monitoring/http_server.rs @@ -196,7 +196,7 @@ impl MonitoringServer { let has_server = snapshot.server_info.is_some(); let has_sv2_clients = snapshot.sv2_clients_summary.is_some(); - // Create metrics with SV1 monitoring enabled + // Re-create metrics with SV1 enabled let metrics = PrometheusMetrics::new(has_server, has_sv2_clients, true)?; // Add Sv1 clients source and attach new metrics to the cache @@ -806,6 +806,7 @@ mod tests { rollable_extranonce_size: 4, expected_shares_per_minute: 1.0, shares_accepted: 10, + shares_rejected: 0, share_work_sum: 100.0, last_share_sequence_number: 5, best_diff: 50.0, @@ -829,6 +830,7 @@ mod tests { extranonce_prefix_hex: "bb".into(), expected_shares_per_minute: 2.0, shares_accepted: 20, + shares_rejected: 0, share_work_sum: 200.0, last_share_sequence_number: 8, best_diff: 80.0, diff --git a/stratum-apps/src/monitoring/prometheus_metrics.rs b/stratum-apps/src/monitoring/prometheus_metrics.rs index 3a97b7e37..08842b7e4 100644 --- a/stratum-apps/src/monitoring/prometheus_metrics.rs +++ b/stratum-apps/src/monitoring/prometheus_metrics.rs @@ -14,6 +14,8 @@ pub struct PrometheusMetrics { pub sv2_server_hashrate_total: Option, pub sv2_server_channel_hashrate: Option, pub sv2_server_shares_accepted_total: Option, + pub sv2_server_shares_submitted_total: Option, + pub sv2_server_shares_rejected_total: Option, pub sv2_server_blocks_found_total: Option, // Clients metrics (downstream connections) pub sv2_clients_total: Option, @@ -21,6 +23,7 @@ pub struct PrometheusMetrics { pub sv2_client_hashrate_total: Option, pub sv2_client_channel_hashrate: Option, pub sv2_client_shares_accepted_total: Option, + pub sv2_client_shares_rejected_total: Option, pub sv2_client_blocks_found_total: Option, // SV1 metrics pub sv1_clients_total: Option, @@ -45,6 +48,8 @@ impl PrometheusMetrics { sv2_server_hashrate_total, sv2_server_channel_hashrate, sv2_server_shares_accepted_total, + sv2_server_shares_submitted_total, + sv2_server_shares_rejected_total, sv2_server_blocks_found_total, ) = if enable_server_metrics { let channels = GaugeVec::new( @@ -77,6 +82,24 @@ impl PrometheusMetrics { )?; registry.register(Box::new(shares_accepted.clone()))?; + let shares_submitted = GaugeVec::new( + Opts::new( + "sv2_server_shares_submitted_total", + "Total shares submitted (validated) per server channel", + ), + &["channel_id", "user_identity"], + )?; + registry.register(Box::new(shares_submitted.clone()))?; + + let shares_rejected = GaugeVec::new( + Opts::new( + "sv2_server_shares_rejected_total", + "Total shares rejected per server channel by error code", + ), + &["channel_id", "user_identity", "error_code"], + )?; + registry.register(Box::new(shares_rejected.clone()))?; + let blocks_found = Gauge::new( "sv2_server_blocks_found_total", "Total blocks found across all current server channels", @@ -88,10 +111,12 @@ impl PrometheusMetrics { Some(hashrate), Some(channel_hashrate), Some(shares_accepted), + Some(shares_submitted), + Some(shares_rejected), Some(blocks_found), ) } else { - (None, None, None, None, None) + (None, None, None, None, None, None, None) }; // Clients metrics (downstream connections) @@ -101,6 +126,7 @@ impl PrometheusMetrics { sv2_client_hashrate_total, sv2_client_channel_hashrate, sv2_client_shares_accepted_total, + sv2_client_shares_rejected_total, sv2_client_blocks_found_total, ) = if enable_clients_metrics { let clients_total = @@ -137,6 +163,15 @@ impl PrometheusMetrics { )?; registry.register(Box::new(shares_accepted.clone()))?; + let shares_rejected = GaugeVec::new( + Opts::new( + "sv2_client_shares_rejected_total", + "Total shares rejected per client channel", + ), + &["client_id", "channel_id", "user_identity"], + )?; + registry.register(Box::new(shares_rejected.clone()))?; + let blocks_found = Gauge::new( "sv2_client_blocks_found_total", "Total blocks found across all current client channels", @@ -149,10 +184,11 @@ impl PrometheusMetrics { Some(hashrate), Some(channel_hashrate), Some(shares_accepted), + Some(shares_rejected), Some(blocks_found), ) } else { - (None, None, None, None, None, None) + (None, None, None, None, None, None, None) }; // SV1 metrics @@ -175,12 +211,15 @@ impl PrometheusMetrics { sv2_server_hashrate_total, sv2_server_channel_hashrate, sv2_server_shares_accepted_total, + sv2_server_shares_submitted_total, + sv2_server_shares_rejected_total, sv2_server_blocks_found_total, sv2_clients_total, sv2_client_channels, sv2_client_hashrate_total, sv2_client_channel_hashrate, sv2_client_shares_accepted_total, + sv2_client_shares_rejected_total, sv2_client_blocks_found_total, sv1_clients_total, sv1_hashrate_total, @@ -210,6 +249,8 @@ mod tests { assert!(m.sv2_server_hashrate_total.is_some()); assert!(m.sv2_server_channel_hashrate.is_some()); assert!(m.sv2_server_shares_accepted_total.is_some()); + assert!(m.sv2_server_shares_submitted_total.is_some()); + assert!(m.sv2_server_shares_rejected_total.is_some()); // clients and sv1 should be None assert!(m.sv2_clients_total.is_none()); assert!(m.sv1_clients_total.is_none()); @@ -223,6 +264,7 @@ mod tests { assert!(m.sv2_client_hashrate_total.is_some()); assert!(m.sv2_client_channel_hashrate.is_some()); assert!(m.sv2_client_shares_accepted_total.is_some()); + assert!(m.sv2_client_shares_rejected_total.is_some()); // server and sv1 should be None assert!(m.sv2_server_channels.is_none()); assert!(m.sv1_clients_total.is_none()); diff --git a/stratum-apps/src/monitoring/snapshot_cache.rs b/stratum-apps/src/monitoring/snapshot_cache.rs index fef8ad75f..61ab5861b 100644 --- a/stratum-apps/src/monitoring/snapshot_cache.rs +++ b/stratum-apps/src/monitoring/snapshot_cache.rs @@ -55,9 +55,11 @@ use super::{ /// remove only stale series instead of calling `.reset()` (which would create a /// gap where all label series momentarily disappear). #[derive(Default)] -struct PreviousPrometheusLabelSets { +struct PreviousLabelSets { /// Labels for server per-channel GaugeVecs: [channel_id, user_identity] server_channel_labels: HashSet<[String; 2]>, + /// Labels for server rejection GaugeVec: [channel_id, user_identity, error_code] + server_rejection_labels: HashSet<[String; 3]>, /// Labels for client per-channel GaugeVecs: [client_id, channel_id, user_identity] client_channel_labels: HashSet<[String; 3]>, } @@ -90,23 +92,17 @@ pub struct SnapshotCache { sv2_clients_source: Option>, sv1_clients_source: Option>, metrics: Option, - previous_metrics_labels: Mutex, + previous_labels: Mutex, } impl Clone for SnapshotCache { fn clone(&self) -> Self { // Clone creates a new cache with the same sources and current snapshot. - // previous_metrics_labels is cloned so the new cache can correctly detect - // stale label combinations on its first refresh. + // previous_labels is cloned so the new cache can correctly detect stale + // label combinations on its first refresh. let current_snapshot = self.snapshot.read().unwrap().clone(); - // Recovering from a poisoned mutex is safe here: the inner sets only - // track which Prometheus label combinations were populated last refresh, - // used solely to compute stale-label removals. The data has no - // cross-field invariants, and worst-case drift (a stale label surviving - // one cycle, or an idempotent remove that we already log at debug) is - // harmless. Panicking here would crash the monitoring server. - let previous_metrics_labels = self - .previous_metrics_labels + let prev = self + .previous_labels .lock() .unwrap_or_else(|e| e.into_inner()); Self { @@ -116,9 +112,10 @@ impl Clone for SnapshotCache { sv2_clients_source: self.sv2_clients_source.clone(), sv1_clients_source: self.sv1_clients_source.clone(), metrics: self.metrics.clone(), - previous_metrics_labels: Mutex::new(PreviousPrometheusLabelSets { - server_channel_labels: previous_metrics_labels.server_channel_labels.clone(), - client_channel_labels: previous_metrics_labels.client_channel_labels.clone(), + previous_labels: Mutex::new(PreviousLabelSets { + server_channel_labels: prev.server_channel_labels.clone(), + server_rejection_labels: prev.server_rejection_labels.clone(), + client_channel_labels: prev.client_channel_labels.clone(), }), } } @@ -144,7 +141,7 @@ impl SnapshotCache { sv2_clients_source, sv1_clients_source: None, metrics: None, - previous_metrics_labels: Mutex::new(PreviousPrometheusLabelSets::default()), + previous_labels: Mutex::new(PreviousLabelSets::default()), } } @@ -219,6 +216,7 @@ impl SnapshotCache { /// label combinations that are no longer present. fn update_metrics(&self, metrics: &PrometheusMetrics, snapshot: &MonitoringSnapshot) { let mut current_server_labels: HashSet<[String; 2]> = HashSet::new(); + let mut current_server_rejection_labels: HashSet<[String; 3]> = HashSet::new(); let mut current_client_labels: HashSet<[String; 3]> = HashSet::new(); // Server metrics @@ -244,6 +242,21 @@ impl SnapshotCache { m.with_label_values(&[&channel_id, user]) .set(channel.shares_acknowledged as f64); } + if let Some(ref m) = metrics.sv2_server_shares_submitted_total { + m.with_label_values(&[&channel_id, user]) + .set(channel.shares_submitted as f64); + } + if let Some(ref m) = metrics.sv2_server_shares_rejected_total { + for (error_code, count) in &channel.shares_rejected_by_reason { + m.with_label_values(&[&channel_id, user, error_code]) + .set(*count as f64); + current_server_rejection_labels.insert([ + channel_id.clone(), + user.clone(), + error_code.clone(), + ]); + } + } if let (Some(ref m), Some(hashrate)) = ( &metrics.sv2_server_channel_hashrate, channel.nominal_hashrate, @@ -263,6 +276,21 @@ impl SnapshotCache { m.with_label_values(&[&channel_id, user]) .set(channel.shares_acknowledged as f64); } + if let Some(ref m) = metrics.sv2_server_shares_submitted_total { + m.with_label_values(&[&channel_id, user]) + .set(channel.shares_submitted as f64); + } + if let Some(ref m) = metrics.sv2_server_shares_rejected_total { + for (error_code, count) in &channel.shares_rejected_by_reason { + m.with_label_values(&[&channel_id, user, error_code]) + .set(*count as f64); + current_server_rejection_labels.insert([ + channel_id.clone(), + user.clone(), + error_code.clone(), + ]); + } + } if let (Some(ref m), Some(hashrate)) = ( &metrics.sv2_server_channel_hashrate, channel.nominal_hashrate, @@ -318,6 +346,10 @@ impl SnapshotCache { m.with_label_values(&[&client_id, &channel_id, user]) .set(channel.shares_accepted as f64); } + if let Some(ref m) = metrics.sv2_client_shares_rejected_total { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.shares_rejected as f64); + } if let Some(ref m) = metrics.sv2_client_channel_hashrate { m.with_label_values(&[&client_id, &channel_id, user]) .set(channel.nominal_hashrate as f64); @@ -335,6 +367,10 @@ impl SnapshotCache { m.with_label_values(&[&client_id, &channel_id, user]) .set(channel.shares_accepted as f64); } + if let Some(ref m) = metrics.sv2_client_shares_rejected_total { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.shares_rejected as f64); + } if let Some(ref m) = metrics.sv2_client_channel_hashrate { m.with_label_values(&[&client_id, &channel_id, user]) .set(channel.nominal_hashrate as f64); @@ -360,12 +396,12 @@ impl SnapshotCache { } // Remove stale label combinations that are no longer in the snapshot - let mut previous_metrics_labels = self - .previous_metrics_labels + let mut prev = self + .previous_labels .lock() .unwrap_or_else(|e| e.into_inner()); - for stale in previous_metrics_labels + for stale in prev .server_channel_labels .difference(¤t_server_labels) { @@ -375,6 +411,11 @@ impl SnapshotCache { debug!(labels = ?label_refs, error = %e, "failed to remove stale server shares label"); } } + if let Some(ref m) = metrics.sv2_server_shares_submitted_total { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale server shares submitted label"); + } + } if let Some(ref m) = metrics.sv2_server_channel_hashrate { if let Err(e) = m.remove_label_values(&label_refs) { debug!(labels = ?label_refs, error = %e, "failed to remove stale server hashrate label"); @@ -382,7 +423,19 @@ impl SnapshotCache { } } - for stale in previous_metrics_labels + for stale in prev + .server_rejection_labels + .difference(¤t_server_rejection_labels) + { + let label_refs: Vec<&str> = stale.iter().map(|s| s.as_str()).collect(); + if let Some(ref m) = metrics.sv2_server_shares_rejected_total { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale server rejection label"); + } + } + } + + for stale in prev .client_channel_labels .difference(¤t_client_labels) { @@ -392,6 +445,11 @@ impl SnapshotCache { debug!(labels = ?label_refs, error = %e, "failed to remove stale client shares label"); } } + if let Some(ref m) = metrics.sv2_client_shares_rejected_total { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale client shares rejected label"); + } + } if let Some(ref m) = metrics.sv2_client_channel_hashrate { if let Err(e) = m.remove_label_values(&label_refs) { debug!(labels = ?label_refs, error = %e, "failed to remove stale client hashrate label"); @@ -399,8 +457,9 @@ impl SnapshotCache { } } - previous_metrics_labels.server_channel_labels = current_server_labels; - previous_metrics_labels.client_channel_labels = current_client_labels; + prev.server_channel_labels = current_server_labels; + prev.server_rejection_labels = current_server_rejection_labels; + prev.client_channel_labels = current_client_labels; } /// Get the refresh interval From aa3031614e3a0947f15e7b0dc5b2ff9ab63042f7 Mon Sep 17 00:00:00 2001 From: Eric Price Date: Sat, 2 May 2026 20:42:25 -0400 Subject: [PATCH 2/2] feat(monitoring): expose share rejection metrics on Prometheus surface Add Prometheus gauges for share submission and rejection data that was previously only available through the JSON monitoring API: Server metrics: - sv2_server_shares_submitted_total{channel_id, user_identity} - sv2_server_shares_rejected_total{channel_id, user_identity, error_code} Client metrics: - sv2_client_shares_rejected_total{client_id, channel_id, user_identity} These enable time-series alerting on rejection rates and per-reason breakdown (stale, duplicate-share, etc.) via rate() queries and recording rules. Implementation: - Register new GaugeVecs in PrometheusMetrics - Populate from existing shares_rejected HashMap (server) and rejected_shares u32 (client) in SnapshotCache::update_metrics - Track server rejection label triples in PreviousLabelSets for stale series cleanup - Add shares_rejected field to client ExtendedChannelInfo and StandardChannelInfo (defaults to 0 until stratum#2119 adds rejection tracking to server-side ShareAccounting) --- stratum-apps/src/monitoring/README.md | 7 +++++-- stratum-apps/src/monitoring/http_server.rs | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/stratum-apps/src/monitoring/README.md b/stratum-apps/src/monitoring/README.md index a2a45c7d7..2a69b9b9a 100644 --- a/stratum-apps/src/monitoring/README.md +++ b/stratum-apps/src/monitoring/README.md @@ -73,7 +73,9 @@ tokio::spawn(async move { - `sv2_server_channels{channel_type}` - Server channels by type (extended/standard) - `sv2_server_hashrate_total` - Total server hashrate - `sv2_server_channel_hashrate{channel_id, user_identity}` - Per-channel hashrate -- `sv2_server_shares_accepted_total{channel_id, user_identity}` - Per-channel shares +- `sv2_server_shares_accepted_total{channel_id, user_identity}` - Per-channel accepted shares +- `sv2_server_shares_submitted_total{channel_id, user_identity}` - Per-channel submitted (validated) shares +- `sv2_server_shares_rejected_total{channel_id, user_identity, error_code}` - Per-channel rejected shares by error code - `sv2_server_blocks_found_total` - Total blocks found across all current server channels **Clients:** @@ -81,7 +83,8 @@ tokio::spawn(async move { - `sv2_client_channels{channel_type}` - Client channels by type (extended/standard) - `sv2_client_hashrate_total` - Total client hashrate - `sv2_client_channel_hashrate{client_id, channel_id, user_identity}` - Per-channel hashrate -- `sv2_client_shares_accepted_total{client_id, channel_id, user_identity}` - Per-channel shares +- `sv2_client_shares_accepted_total{client_id, channel_id, user_identity}` - Per-channel accepted shares +- `sv2_client_shares_rejected_total{client_id, channel_id, user_identity}` - Per-channel rejected shares - `sv2_client_blocks_found_total` - Total blocks found across all current client channels **Sv1 (Translator Proxy only):** diff --git a/stratum-apps/src/monitoring/http_server.rs b/stratum-apps/src/monitoring/http_server.rs index 5f5a416e0..8692c8694 100644 --- a/stratum-apps/src/monitoring/http_server.rs +++ b/stratum-apps/src/monitoring/http_server.rs @@ -855,10 +855,10 @@ mod tests { rollable_extranonce_size: 4, version_rolling: true, shares_acknowledged: 10, + shares_submitted: 12, shares_rejected: 0, shares_rejected_by_reason: HashMap::new(), share_work_sum: 100.0, - shares_submitted: 12, best_diff: 50.0, blocks_found: 0, }