-
Notifications
You must be signed in to change notification settings - Fork 4
Add Scheduler and TokenBucket metrics #125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/metrics
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,7 @@ use std::{ | |
| }; | ||
|
|
||
| use crate::metrics::{ | ||
| instruments::{Gauge, Histogram, IncreasingCounter}, | ||
| instruments::{Gauge, Histogram, IncreasingCounter, UpDownCounter}, | ||
| unit, | ||
| }; | ||
|
|
||
|
|
@@ -157,6 +157,7 @@ impl TransferMetrics { | |
| self.parts_failed.increment(amount); | ||
| } | ||
|
|
||
| /// Record that a transfer has failed | ||
| pub(crate) fn mark_transfer_failed(&self) { | ||
| self.transfer_failed.store(true, Ordering::Relaxed); | ||
| } | ||
|
|
@@ -165,46 +166,135 @@ impl TransferMetrics { | |
| /// Scheduler-level metrics for concurrency tracking | ||
| #[derive(Debug, Clone, Default)] | ||
| pub(crate) struct SchedulerMetrics { | ||
| /// Number of active permits in use | ||
| active_permits: Gauge, | ||
| /// Number of permits waiting to be acquired | ||
| permits_waiting: Gauge, | ||
| /// Total permits acquired | ||
| permits_acquired: IncreasingCounter, | ||
| /// Permit acquisition wait time | ||
| // Total tasks in flight | ||
| inflight: UpDownCounter, | ||
| // High water mark for inflight tasks | ||
| max_inflight: IncreasingCounter, | ||
| // Count of failed permit acquisitions | ||
| permit_acquisition_failures: IncreasingCounter, | ||
| // Tracking the wait time of permits | ||
| permit_wait_time: Histogram, | ||
| /// Current number of in-flight requests (for backward compatibility) | ||
| inflight: Arc<AtomicU64>, | ||
|
|
||
| // percentage of max concurrency in use | ||
| scheduler_saturation: Gauge, | ||
| } | ||
|
|
||
| impl SchedulerMetrics { | ||
| /// Create new scheduler metrics | ||
| pub(crate) fn new() -> Self { | ||
| Self { | ||
| active_permits: Gauge::new(), | ||
| permits_waiting: Gauge::new(), | ||
| permits_acquired: IncreasingCounter::new(), | ||
| inflight: UpDownCounter::new(), | ||
| max_inflight: IncreasingCounter::new(), | ||
| permit_acquisition_failures: IncreasingCounter::new(), | ||
| permit_wait_time: Histogram::new(), | ||
| inflight: Arc::new(AtomicU64::new(0)), | ||
| scheduler_saturation: Gauge::new(), | ||
| } | ||
| } | ||
|
|
||
| /// Increment the number of in-flight requests and returns the number currently in-flight after | ||
| /// incrementing. | ||
| pub(crate) fn increment_inflight(&self) -> usize { | ||
| (self.inflight.fetch_add(1, Ordering::Relaxed) + 1) as usize | ||
| pub(crate) fn increment_inflight(&self) -> u64 { | ||
| let val = self.inflight.increment(1); | ||
| self.update_max_inflight(); | ||
| val | ||
| } | ||
|
|
||
| /// Decrement the number of in-flight requests and returns the number currently in-flight after | ||
| /// decrementing. | ||
| pub(crate) fn decrement_inflight(&self) -> usize { | ||
| (self.inflight.fetch_sub(1, Ordering::Relaxed) - 1) as usize | ||
| pub(crate) fn decrement_inflight(&self) -> u64 { | ||
| self.inflight.decrement(1) | ||
| } | ||
|
|
||
| /// Update max inflight metric | ||
| fn update_max_inflight(&self) { | ||
| let current_max = self.max_inflight.value(); | ||
| let current_inflight = self.inflight(); | ||
| if current_inflight > current_max { | ||
| self.max_inflight.increment(current_inflight - current_max); | ||
| } | ||
| } | ||
|
|
||
| /// Record permit acquisition failure | ||
| pub(crate) fn record_permit_acquisition_failure(&self) { | ||
| self.permit_acquisition_failures.increment(1); | ||
| } | ||
|
|
||
| /// Record permit wait time | ||
| pub(crate) fn record_permit_wait_time(&self, duration_secs: f64) { | ||
| self.permit_wait_time.record(duration_secs); | ||
| } | ||
|
|
||
| /// Set scheduler saturation percentage | ||
| pub(crate) fn set_scheduler_saturation(&self, inflight: u64, max_capacity: u64) { | ||
| let saturation = if max_capacity == 0 { | ||
| 0.0 | ||
| } else { | ||
| (inflight as f64 / max_capacity as f64) * 100.0 | ||
| }; | ||
| self.scheduler_saturation.set(saturation); | ||
| } | ||
|
|
||
| /// Get the current number of in-flight requests | ||
| #[cfg(test)] | ||
| pub(crate) fn inflight(&self) -> usize { | ||
| self.inflight.load(Ordering::Relaxed) as usize | ||
| pub(crate) fn inflight(&self) -> u64 { | ||
| self.inflight.value() | ||
| } | ||
|
|
||
| /// Get permit wait time histogram | ||
| pub(crate) fn permit_wait_time(&self) -> &Histogram { | ||
| &self.permit_wait_time | ||
| } | ||
|
|
||
| /// Get permit acquisition failures counter | ||
| pub(crate) fn permit_acquisition_failures(&self) -> &IncreasingCounter { | ||
| &self.permit_acquisition_failures | ||
| } | ||
|
|
||
| /// Get max inflight gauge | ||
| pub(crate) fn max_inflight(&self) -> &IncreasingCounter { | ||
| &self.max_inflight | ||
| } | ||
|
|
||
| /// Get scheduler saturation gauge | ||
| pub(crate) fn scheduler_saturation(&self) -> &Gauge { | ||
| &self.scheduler_saturation | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Default, Clone)] | ||
| pub(crate) struct TokenBucketMetrics { | ||
| max_tokens: u64, | ||
| available_tokens: UpDownCounter, | ||
| token_wait_time: Histogram, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this will be meaningful or not given permit requests come with different token amounts requested.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if tracking percentage of available tokens over time or something would be more useful? |
||
| } | ||
|
|
||
| impl TokenBucketMetrics { | ||
| // Create new TokenBucketMetrics with max_tokens | ||
| pub(crate) fn new(max_tokens: u64) -> Self { | ||
| Self { | ||
| max_tokens, | ||
| available_tokens: UpDownCounter::new_value(max_tokens), | ||
| token_wait_time: Histogram::new(), | ||
| } | ||
| } | ||
|
|
||
| /// Get available tokens gauge | ||
| pub(crate) fn available_tokens(&self) -> &UpDownCounter { | ||
| &self.available_tokens | ||
| } | ||
|
|
||
| /// Record token acquisition | ||
| pub(crate) fn record_token_acquisition(&self) { | ||
| self.available_tokens.decrement(1); | ||
| } | ||
|
|
||
| /// Record token release | ||
| pub(crate) fn record_token_release(&self) { | ||
| self.available_tokens.increment(1); | ||
| } | ||
|
|
||
| /// Record token wait time | ||
| pub(crate) fn record_token_wait_time(&self, wait_time_secs: f64) { | ||
| self.token_wait_time.record(wait_time_secs); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -214,7 +304,7 @@ pub(crate) struct SamplingConfig { | |
| /// Interval between samples. | ||
| pub interval: Duration, | ||
| /// Maximum number of samples to retain. | ||
| pub max_samples: usize, | ||
| pub max_samples: u64, | ||
| } | ||
|
|
||
| impl Default for SamplingConfig { | ||
|
|
@@ -247,7 +337,7 @@ struct ThroughputHistory { | |
| pub(crate) samples: VecDeque<ThroughputSample>, | ||
| last_sample_time: Option<std::time::Instant>, | ||
| sample_interval: Duration, | ||
| max_samples: usize, | ||
| max_samples: u64, | ||
| } | ||
|
|
||
| #[derive(Debug, Clone)] | ||
|
|
@@ -267,7 +357,7 @@ impl ThroughputMetrics { | |
| pub(crate) fn with_sampling(sampling_config: Option<SamplingConfig>) -> Self { | ||
| let history = sampling_config.map(|config| { | ||
| Arc::new(Mutex::new(ThroughputHistory { | ||
| samples: VecDeque::with_capacity(config.max_samples), | ||
| samples: VecDeque::with_capacity(config.max_samples as usize), | ||
| last_sample_time: None, | ||
| sample_interval: config.interval, | ||
| max_samples: config.max_samples, | ||
|
|
@@ -351,7 +441,7 @@ impl ThroughputMetrics { | |
| bytes_per_second: bps, | ||
| }); | ||
|
|
||
| if hist.samples.len() > hist.max_samples { | ||
| if hist.samples.len() > hist.max_samples as usize { | ||
| hist.samples.pop_front(); | ||
| } | ||
|
|
||
|
|
@@ -695,4 +785,32 @@ mod tests { | |
| 2 | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_scheduler_metrics() { | ||
| use crate::metrics::aggregators::SchedulerMetrics; | ||
| let metrics = SchedulerMetrics::new(); | ||
|
|
||
| assert_eq!(metrics.permit_acquisition_failures().value(), 0); | ||
| metrics.record_permit_acquisition_failure(); | ||
| assert_eq!(metrics.permit_acquisition_failures().value(), 1); | ||
|
|
||
| metrics.record_permit_wait_time(0.5); | ||
| assert!(metrics.permit_wait_time().count() > 0); | ||
|
|
||
| for i in 0..10 { | ||
| metrics.increment_inflight(); | ||
| } | ||
| assert_eq!(metrics.max_inflight().value(), 10); | ||
| // Should not update since it is lower | ||
| metrics.decrement_inflight(); | ||
| assert_eq!(metrics.max_inflight().value(), 10); | ||
| // Should update since it is higher | ||
| metrics.increment_inflight(); | ||
| metrics.increment_inflight(); | ||
| assert_eq!(metrics.max_inflight().value(), 11); | ||
|
|
||
| metrics.set_scheduler_saturation(10, 50); | ||
| assert_eq!(metrics.scheduler_saturation().value(), 20.0); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure we need this at this stage