Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 143 additions & 25 deletions aws-sdk-s3-transfer-manager/src/metrics/aggregators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
};

use crate::metrics::{
instruments::{Gauge, Histogram, IncreasingCounter},
instruments::{Gauge, Histogram, IncreasingCounter, UpDownCounter},
unit,
};

Expand Down Expand Up @@ -157,6 +157,7 @@ impl TransferMetrics {
self.parts_failed.increment(amount);
}

/// Record that a transfer has failed
pub(crate) fn mark_transfer_failed(&self) {
self.transfer_failed.store(true, Ordering::Relaxed);
}
Expand All @@ -165,46 +166,135 @@ impl TransferMetrics {
/// Scheduler-level metrics for concurrency tracking
#[derive(Debug, Clone, Default)]
pub(crate) struct SchedulerMetrics {
/// Number of active permits in use
active_permits: Gauge,
/// Number of permits waiting to be acquired
permits_waiting: Gauge,
/// Total permits acquired
permits_acquired: IncreasingCounter,
/// Permit acquisition wait time
// Total tasks in flight
inflight: UpDownCounter,
// High water mark for inflight tasks
max_inflight: IncreasingCounter,
// Count of failed permit acquisitions
permit_acquisition_failures: IncreasingCounter,
// Tracking the wait time of permits
permit_wait_time: Histogram,
/// Current number of in-flight requests (for backward compatibility)
inflight: Arc<AtomicU64>,

// percentage of max concurrency in use
scheduler_saturation: Gauge,
}

impl SchedulerMetrics {
/// Create new scheduler metrics
pub(crate) fn new() -> Self {
Self {
active_permits: Gauge::new(),
permits_waiting: Gauge::new(),
permits_acquired: IncreasingCounter::new(),
inflight: UpDownCounter::new(),
max_inflight: IncreasingCounter::new(),
permit_acquisition_failures: IncreasingCounter::new(),
permit_wait_time: Histogram::new(),
inflight: Arc::new(AtomicU64::new(0)),
scheduler_saturation: Gauge::new(),
}
}

/// Increment the number of in-flight requests and returns the number currently in-flight after
/// incrementing.
pub(crate) fn increment_inflight(&self) -> usize {
(self.inflight.fetch_add(1, Ordering::Relaxed) + 1) as usize
pub(crate) fn increment_inflight(&self) -> u64 {
let val = self.inflight.increment(1);
self.update_max_inflight();
val
}

/// Decrement the number of in-flight requests and returns the number currently in-flight after
/// decrementing.
pub(crate) fn decrement_inflight(&self) -> usize {
(self.inflight.fetch_sub(1, Ordering::Relaxed) - 1) as usize
pub(crate) fn decrement_inflight(&self) -> u64 {
self.inflight.decrement(1)
}

/// Update max inflight metric
fn update_max_inflight(&self) {
let current_max = self.max_inflight.value();
let current_inflight = self.inflight();
if current_inflight > current_max {
self.max_inflight.increment(current_inflight - current_max);
}
}

/// Record permit acquisition failure
pub(crate) fn record_permit_acquisition_failure(&self) {
self.permit_acquisition_failures.increment(1);
}

/// Record permit wait time
pub(crate) fn record_permit_wait_time(&self, duration_secs: f64) {
self.permit_wait_time.record(duration_secs);
}

/// Set scheduler saturation percentage
pub(crate) fn set_scheduler_saturation(&self, inflight: u64, max_capacity: u64) {
let saturation = if max_capacity == 0 {
0.0
} else {
(inflight as f64 / max_capacity as f64) * 100.0
};
self.scheduler_saturation.set(saturation);
}

/// Get the current number of in-flight requests
#[cfg(test)]
pub(crate) fn inflight(&self) -> usize {
self.inflight.load(Ordering::Relaxed) as usize
pub(crate) fn inflight(&self) -> u64 {
self.inflight.value()
}

/// Get permit wait time histogram
pub(crate) fn permit_wait_time(&self) -> &Histogram {
&self.permit_wait_time
}

/// Get permit acquisition failures counter
pub(crate) fn permit_acquisition_failures(&self) -> &IncreasingCounter {
&self.permit_acquisition_failures
}

/// Get max inflight gauge
pub(crate) fn max_inflight(&self) -> &IncreasingCounter {
&self.max_inflight
}

/// Get scheduler saturation gauge
pub(crate) fn scheduler_saturation(&self) -> &Gauge {
&self.scheduler_saturation
}
}

#[derive(Debug, Default, Clone)]
pub(crate) struct TokenBucketMetrics {
max_tokens: u64,
available_tokens: UpDownCounter,
token_wait_time: Histogram,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this will be meaningful or not given permit requests come with different token amounts requested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if tracking percentage of available tokens over time or something would be more useful?

}

impl TokenBucketMetrics {
// Create new TokenBucketMetrics with max_tokens
pub(crate) fn new(max_tokens: u64) -> Self {
Self {
max_tokens,
available_tokens: UpDownCounter::new_value(max_tokens),
token_wait_time: Histogram::new(),
}
}

/// Get available tokens gauge
pub(crate) fn available_tokens(&self) -> &UpDownCounter {
&self.available_tokens
}

/// Record token acquisition
pub(crate) fn record_token_acquisition(&self) {
self.available_tokens.decrement(1);
}

/// Record token release
pub(crate) fn record_token_release(&self) {
self.available_tokens.increment(1);
}

/// Record token wait time
pub(crate) fn record_token_wait_time(&self, wait_time_secs: f64) {
self.token_wait_time.record(wait_time_secs);
}
}

Expand All @@ -214,7 +304,7 @@ pub(crate) struct SamplingConfig {
/// Interval between samples.
pub interval: Duration,
/// Maximum number of samples to retain.
pub max_samples: usize,
pub max_samples: u64,
}

impl Default for SamplingConfig {
Expand Down Expand Up @@ -247,7 +337,7 @@ struct ThroughputHistory {
pub(crate) samples: VecDeque<ThroughputSample>,
last_sample_time: Option<std::time::Instant>,
sample_interval: Duration,
max_samples: usize,
max_samples: u64,
}

#[derive(Debug, Clone)]
Expand All @@ -267,7 +357,7 @@ impl ThroughputMetrics {
pub(crate) fn with_sampling(sampling_config: Option<SamplingConfig>) -> Self {
let history = sampling_config.map(|config| {
Arc::new(Mutex::new(ThroughputHistory {
samples: VecDeque::with_capacity(config.max_samples),
samples: VecDeque::with_capacity(config.max_samples as usize),
last_sample_time: None,
sample_interval: config.interval,
max_samples: config.max_samples,
Expand Down Expand Up @@ -351,7 +441,7 @@ impl ThroughputMetrics {
bytes_per_second: bps,
});

if hist.samples.len() > hist.max_samples {
if hist.samples.len() > hist.max_samples as usize {
hist.samples.pop_front();
}

Expand Down Expand Up @@ -695,4 +785,32 @@ mod tests {
2
);
}

#[test]
fn test_scheduler_metrics() {
use crate::metrics::aggregators::SchedulerMetrics;
let metrics = SchedulerMetrics::new();

assert_eq!(metrics.permit_acquisition_failures().value(), 0);
metrics.record_permit_acquisition_failure();
assert_eq!(metrics.permit_acquisition_failures().value(), 1);

metrics.record_permit_wait_time(0.5);
assert!(metrics.permit_wait_time().count() > 0);

for i in 0..10 {
metrics.increment_inflight();
}
assert_eq!(metrics.max_inflight().value(), 10);
// Should not update since it is lower
metrics.decrement_inflight();
assert_eq!(metrics.max_inflight().value(), 10);
// Should update since it is higher
metrics.increment_inflight();
metrics.increment_inflight();
assert_eq!(metrics.max_inflight().value(), 11);

metrics.set_scheduler_saturation(10, 50);
assert_eq!(metrics.scheduler_saturation().value(), 20.0);
}
}
41 changes: 41 additions & 0 deletions aws-sdk-s3-transfer-manager/src/metrics/instruments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,47 @@ impl IncreasingCounter {
}
}

/// An integer value that can increase or decrease.
#[derive(Debug, Clone, Default)]
pub struct UpDownCounter {
value: Arc<AtomicU64>,
}

impl UpDownCounter {
/// Create a new counter starting at zero.
pub fn new() -> Self {
Self {
value: Arc::new(AtomicU64::new(0)),
}
}

/// Create a new counter starting at value.
pub fn new_value(value: u64) -> Self {
Self {
value: Arc::new(AtomicU64::new(value)),
}
}

/// Increment the counter by the given amount and return the new value.
pub fn increment(&self, amount: u64) -> u64 {
self.value.fetch_add(amount, Ordering::Relaxed) + amount
}

/// Decrement the counter by the given amount and return the new value. Clamped at 0.
pub fn decrement(&self, amount: u64) -> u64 {
self.value
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_sub(amount))
})
.unwrap()
}

/// Get the current value of the counter.
pub fn value(&self) -> u64 {
self.value.load(Ordering::Relaxed)
}
}

// Implementation note: there is no AtomicF64 so the bytes are stored as an AtomicU64 and
// reinterpreted as an f64 at user exposed endpoints. Just wrapping it in a mutex might
// be more performant, but need to benchmark.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ where
let req = this.request.take().expect("request set");
let inflight = this.scheduler.metrics.increment_inflight();
tracing::trace!("in-flight requests: {inflight}");

this.scheduler
.metrics
.set_scheduler_saturation(inflight, this.scheduler.max_tokens());

let svc = this.svc.take().expect("service set");
// NOTE: because the service was (1) never polled for readiness
// originally and (2) also cloned, we need to ensure it's ready now before calling it.
Expand Down
Loading
Loading