diff --git a/aws-sdk-s3-transfer-manager/src/metrics/aggregators.rs b/aws-sdk-s3-transfer-manager/src/metrics/aggregators.rs index 5046997c..79306527 100644 --- a/aws-sdk-s3-transfer-manager/src/metrics/aggregators.rs +++ b/aws-sdk-s3-transfer-manager/src/metrics/aggregators.rs @@ -13,7 +13,7 @@ use std::{ }; use crate::metrics::{ - instruments::{Gauge, Histogram, IncreasingCounter}, + instruments::{Gauge, Histogram, IncreasingCounter, UpDownCounter}, unit, }; @@ -157,6 +157,7 @@ impl TransferMetrics { self.parts_failed.increment(amount); } + /// Record that a transfer has failed pub(crate) fn mark_transfer_failed(&self) { self.transfer_failed.store(true, Ordering::Relaxed); } @@ -165,46 +166,135 @@ impl TransferMetrics { /// Scheduler-level metrics for concurrency tracking #[derive(Debug, Clone, Default)] pub(crate) struct SchedulerMetrics { - /// Number of active permits in use - active_permits: Gauge, - /// Number of permits waiting to be acquired - permits_waiting: Gauge, - /// Total permits acquired - permits_acquired: IncreasingCounter, - /// Permit acquisition wait time + // Total tasks in flight + inflight: UpDownCounter, + // High water mark for inflight tasks + max_inflight: IncreasingCounter, + // Count of failed permit acquisitions + permit_acquisition_failures: IncreasingCounter, + // Tracking the wait time of permits permit_wait_time: Histogram, - /// Current number of in-flight requests (for backward compatibility) - inflight: Arc, + + // percentage of max concurrency in use + scheduler_saturation: Gauge, } impl SchedulerMetrics { /// Create new scheduler metrics pub(crate) fn new() -> Self { Self { - active_permits: Gauge::new(), - permits_waiting: Gauge::new(), - permits_acquired: IncreasingCounter::new(), + inflight: UpDownCounter::new(), + max_inflight: IncreasingCounter::new(), + permit_acquisition_failures: IncreasingCounter::new(), permit_wait_time: Histogram::new(), - inflight: Arc::new(AtomicU64::new(0)), + scheduler_saturation: Gauge::new(), } } /// Increment the number of in-flight requests and returns the number currently in-flight after /// incrementing. - pub(crate) fn increment_inflight(&self) -> usize { - (self.inflight.fetch_add(1, Ordering::Relaxed) + 1) as usize + pub(crate) fn increment_inflight(&self) -> u64 { + let val = self.inflight.increment(1); + self.update_max_inflight(); + val } /// Decrement the number of in-flight requests and returns the number currently in-flight after /// decrementing. - pub(crate) fn decrement_inflight(&self) -> usize { - (self.inflight.fetch_sub(1, Ordering::Relaxed) - 1) as usize + pub(crate) fn decrement_inflight(&self) -> u64 { + self.inflight.decrement(1) + } + + /// Update max inflight metric + fn update_max_inflight(&self) { + let current_max = self.max_inflight.value(); + let current_inflight = self.inflight(); + if current_inflight > current_max { + self.max_inflight.increment(current_inflight - current_max); + } + } + + /// Record permit acquisition failure + pub(crate) fn record_permit_acquisition_failure(&self) { + self.permit_acquisition_failures.increment(1); + } + + /// Record permit wait time + pub(crate) fn record_permit_wait_time(&self, duration_secs: f64) { + self.permit_wait_time.record(duration_secs); + } + + /// Set scheduler saturation percentage + pub(crate) fn set_scheduler_saturation(&self, inflight: u64, max_capacity: u64) { + let saturation = if max_capacity == 0 { + 0.0 + } else { + (inflight as f64 / max_capacity as f64) * 100.0 + }; + self.scheduler_saturation.set(saturation); } /// Get the current number of in-flight requests - #[cfg(test)] - pub(crate) fn inflight(&self) -> usize { - self.inflight.load(Ordering::Relaxed) as usize + pub(crate) fn inflight(&self) -> u64 { + self.inflight.value() + } + + /// Get permit wait time histogram + pub(crate) fn permit_wait_time(&self) -> &Histogram { + &self.permit_wait_time + } + + /// Get permit acquisition failures counter + pub(crate) fn permit_acquisition_failures(&self) -> &IncreasingCounter { + &self.permit_acquisition_failures + } + + /// Get max inflight gauge + pub(crate) fn max_inflight(&self) -> &IncreasingCounter { + &self.max_inflight + } + + /// Get scheduler saturation gauge + pub(crate) fn scheduler_saturation(&self) -> &Gauge { + &self.scheduler_saturation + } +} + +#[derive(Debug, Default, Clone)] +pub(crate) struct TokenBucketMetrics { + max_tokens: u64, + available_tokens: UpDownCounter, + token_wait_time: Histogram, +} + +impl TokenBucketMetrics { + // Create new TokenBucketMetrics with max_tokens + pub(crate) fn new(max_tokens: u64) -> Self { + Self { + max_tokens, + available_tokens: UpDownCounter::new_value(max_tokens), + token_wait_time: Histogram::new(), + } + } + + /// Get available tokens gauge + pub(crate) fn available_tokens(&self) -> &UpDownCounter { + &self.available_tokens + } + + /// Record token acquisition + pub(crate) fn record_token_acquisition(&self) { + self.available_tokens.decrement(1); + } + + /// Record token release + pub(crate) fn record_token_release(&self) { + self.available_tokens.increment(1); + } + + /// Record token wait time + pub(crate) fn record_token_wait_time(&self, wait_time_secs: f64) { + self.token_wait_time.record(wait_time_secs); } } @@ -214,7 +304,7 @@ pub(crate) struct SamplingConfig { /// Interval between samples. pub interval: Duration, /// Maximum number of samples to retain. - pub max_samples: usize, + pub max_samples: u64, } impl Default for SamplingConfig { @@ -247,7 +337,7 @@ struct ThroughputHistory { pub(crate) samples: VecDeque, last_sample_time: Option, sample_interval: Duration, - max_samples: usize, + max_samples: u64, } #[derive(Debug, Clone)] @@ -267,7 +357,7 @@ impl ThroughputMetrics { pub(crate) fn with_sampling(sampling_config: Option) -> Self { let history = sampling_config.map(|config| { Arc::new(Mutex::new(ThroughputHistory { - samples: VecDeque::with_capacity(config.max_samples), + samples: VecDeque::with_capacity(config.max_samples as usize), last_sample_time: None, sample_interval: config.interval, max_samples: config.max_samples, @@ -351,7 +441,7 @@ impl ThroughputMetrics { bytes_per_second: bps, }); - if hist.samples.len() > hist.max_samples { + if hist.samples.len() > hist.max_samples as usize { hist.samples.pop_front(); } @@ -695,4 +785,32 @@ mod tests { 2 ); } + + #[test] + fn test_scheduler_metrics() { + use crate::metrics::aggregators::SchedulerMetrics; + let metrics = SchedulerMetrics::new(); + + assert_eq!(metrics.permit_acquisition_failures().value(), 0); + metrics.record_permit_acquisition_failure(); + assert_eq!(metrics.permit_acquisition_failures().value(), 1); + + metrics.record_permit_wait_time(0.5); + assert!(metrics.permit_wait_time().count() > 0); + + for i in 0..10 { + metrics.increment_inflight(); + } + assert_eq!(metrics.max_inflight().value(), 10); + // Should not update since it is lower + metrics.decrement_inflight(); + assert_eq!(metrics.max_inflight().value(), 10); + // Should update since it is higher + metrics.increment_inflight(); + metrics.increment_inflight(); + assert_eq!(metrics.max_inflight().value(), 11); + + metrics.set_scheduler_saturation(10, 50); + assert_eq!(metrics.scheduler_saturation().value(), 20.0); + } } diff --git a/aws-sdk-s3-transfer-manager/src/metrics/instruments.rs b/aws-sdk-s3-transfer-manager/src/metrics/instruments.rs index 38f3da23..1e58047f 100644 --- a/aws-sdk-s3-transfer-manager/src/metrics/instruments.rs +++ b/aws-sdk-s3-transfer-manager/src/metrics/instruments.rs @@ -28,6 +28,47 @@ impl IncreasingCounter { } } +/// An integer value that can increase or decrease. +#[derive(Debug, Clone, Default)] +pub struct UpDownCounter { + value: Arc, +} + +impl UpDownCounter { + /// Create a new counter starting at zero. + pub fn new() -> Self { + Self { + value: Arc::new(AtomicU64::new(0)), + } + } + + /// Create a new counter starting at value. + pub fn new_value(value: u64) -> Self { + Self { + value: Arc::new(AtomicU64::new(value)), + } + } + + /// Increment the counter by the given amount and return the new value. + pub fn increment(&self, amount: u64) -> u64 { + self.value.fetch_add(amount, Ordering::Relaxed) + amount + } + + /// Decrement the counter by the given amount and return the new value. Clamped at 0. + pub fn decrement(&self, amount: u64) -> u64 { + self.value + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { + Some(current.saturating_sub(amount)) + }) + .unwrap() + } + + /// Get the current value of the counter. + pub fn value(&self) -> u64 { + self.value.load(Ordering::Relaxed) + } +} + // Implementation note: there is no AtomicF64 so the bytes are stored as an AtomicU64 and // reinterpreted as an f64 at user exposed endpoints. Just wrapping it in a mutex might // be more performant, but need to benchmark. diff --git a/aws-sdk-s3-transfer-manager/src/middleware/limit/concurrency/future.rs b/aws-sdk-s3-transfer-manager/src/middleware/limit/concurrency/future.rs index 677bf07a..a3f21aa3 100644 --- a/aws-sdk-s3-transfer-manager/src/middleware/limit/concurrency/future.rs +++ b/aws-sdk-s3-transfer-manager/src/middleware/limit/concurrency/future.rs @@ -83,6 +83,11 @@ where let req = this.request.take().expect("request set"); let inflight = this.scheduler.metrics.increment_inflight(); tracing::trace!("in-flight requests: {inflight}"); + + this.scheduler + .metrics + .set_scheduler_saturation(inflight, this.scheduler.max_tokens()); + let svc = this.svc.take().expect("service set"); // NOTE: because the service was (1) never polled for readiness // originally and (2) also cloned, we need to ensure it's ready now before calling it. diff --git a/aws-sdk-s3-transfer-manager/src/runtime/scheduler.rs b/aws-sdk-s3-transfer-manager/src/runtime/scheduler.rs index 913cc942..d02c0a75 100644 --- a/aws-sdk-s3-transfer-manager/src/runtime/scheduler.rs +++ b/aws-sdk-s3-transfer-manager/src/runtime/scheduler.rs @@ -6,6 +6,8 @@ use futures_util::TryFutureExt; use pin_project_lite::pin_project; use std::future::Future; +use std::sync::Arc; +use std::time::Instant; use crate::error; use crate::metrics::aggregators::SchedulerMetrics; @@ -21,30 +23,49 @@ use crate::types::ConcurrencyMode; #[derive(Debug, Clone)] pub(crate) struct Scheduler { token_bucket: TokenBucket, - pub(crate) metrics: SchedulerMetrics, + pub(crate) metrics: Arc, } impl Scheduler { /// Create a new scheduler with the initial number of work permits. pub(crate) fn new(mode: ConcurrencyMode) -> Self { + let metrics = Arc::new(SchedulerMetrics::new()); + let token_bucket = TokenBucket::new(mode, metrics.clone()); + Self { - token_bucket: TokenBucket::new(mode), - metrics: SchedulerMetrics::new(), + token_bucket, + metrics, } } + /// Get the maximum number of tokens this scheduler can hold + pub(crate) fn max_tokens(&self) -> u64 { + self.token_bucket.max_tokens() + } + /// Acquire a permit to perform some unit of work pub(crate) fn acquire_permit(&self, ptype: PermitType) -> AcquirePermitFuture { + let start_time = Instant::now(); + match self.try_acquire_permit(ptype.clone()) { - Ok(Some(permit)) => AcquirePermitFuture::ready(Ok(permit)), + Ok(Some(permit)) => { + // Record immediate acquisition + self.metrics.record_permit_wait_time(0.0); + AcquirePermitFuture::ready(Ok(permit)) + } Ok(None) => { - let inner = self - .token_bucket - .acquire(ptype) - .map_ok(OwnedWorkPermit::from); + let metrics = self.metrics.clone(); + let inner = self.token_bucket.acquire(ptype).map_ok(move |token| { + // Record wait time when permit is acquired + metrics.record_permit_wait_time(start_time.elapsed().as_secs_f64()); + OwnedWorkPermit::from(token) + }); AcquirePermitFuture::new(inner) } - Err(err) => AcquirePermitFuture::ready(Err(err)), + Err(err) => { + self.metrics.record_permit_acquisition_failure(); + AcquirePermitFuture::ready(Err(err)) + } } } @@ -170,4 +191,61 @@ mod tests { drop(p1); jh.await.unwrap(); } + + #[tokio::test] + async fn test_scheduler_metrics() { + let scheduler = Scheduler::new(ConcurrencyMode::Explicit(2)); + let permit_context = NetworkPermitContext { + payload_size_estimate: 1024, + bucket_type: BucketType::Standard, + direction: TransferDirection::Download, + }; + + let _permit1 = scheduler + .acquire_permit(PermitType::Network(permit_context.clone())) + .await + .unwrap(); + + assert!(scheduler.metrics.permit_wait_time().count() > 0); + + // We never actually fire off a request so no failures or inflight + assert_eq!(scheduler.metrics.permit_acquisition_failures().value(), 0); + assert_eq!(scheduler.metrics.max_inflight().value(), 0); + } + + #[tokio::test] + async fn test_token_bucket_metrics() { + let scheduler = Scheduler::new(ConcurrencyMode::Explicit(10)); + let permit_context = NetworkPermitContext { + payload_size_estimate: 1024, + bucket_type: BucketType::Standard, + direction: TransferDirection::Download, + }; + + // Initial token metrics + assert_eq!( + scheduler.token_bucket.metrics().available_tokens().value(), + 10 + ); + assert_eq!(scheduler.max_tokens(), 10); + + let permit = scheduler + .acquire_permit(PermitType::Network(permit_context)) + .await + .unwrap(); + + // Token acquisition should be recorded + assert_eq!( + scheduler.token_bucket.metrics().available_tokens().value(), + 9 + ); + + drop(permit); + + // Token drop should be recorded + assert_eq!( + scheduler.token_bucket.metrics().available_tokens().value(), + 10 + ); + } } diff --git a/aws-sdk-s3-transfer-manager/src/runtime/token_bucket.rs b/aws-sdk-s3-transfer-manager/src/runtime/token_bucket.rs index c0450363..f82573a4 100644 --- a/aws-sdk-s3-transfer-manager/src/runtime/token_bucket.rs +++ b/aws-sdk-s3-transfer-manager/src/runtime/token_bucket.rs @@ -6,12 +6,13 @@ use pin_project_lite::pin_project; use std::future::Future; use std::task::Poll; +use std::time::Instant; use std::{cmp, sync::Arc, time::Duration}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError}; use tokio_util::sync::PollSemaphore; use crate::error; -use crate::metrics::aggregators::Throughput; +use crate::metrics::aggregators::{SchedulerMetrics, Throughput, TokenBucketMetrics}; use crate::metrics::unit::ByteUnit; use crate::runtime::scheduler::PermitType; use crate::types::BucketType; @@ -149,32 +150,43 @@ pub(crate) struct TokenBucket { // NOTE: tokio semaphore is fair, permits are given out in the order requested semaphore: Arc, mode: ConcurrencyMode, + tb_metrics: Arc, + scheduler_metrics: Arc, } impl TokenBucket { /// Create a new token bucket using the given target throughput to set the maximum number of tokens - pub(crate) fn new(mode: ConcurrencyMode) -> Self { + pub(crate) fn new(mode: ConcurrencyMode, scheduler_metrics: Arc) -> Self { + let max_tokens = Self::max_tokens_inner(&mode); + TokenBucket { + semaphore: Arc::new(Semaphore::new(max_tokens.try_into().unwrap())), + mode, + tb_metrics: Arc::new(TokenBucketMetrics::new(max_tokens)), + scheduler_metrics, + } + } + + fn max_tokens_inner(mode: &ConcurrencyMode) -> u64 { // Permits/tokens are dependent on the concurrency mode: // // ConcurrencyMode::TargetThroughput -> 1 token = 1 Mbit of throughput // ConcurrencyMode::Explicit -> 1 token = 1 request - let max_tokens = match &mode { + match mode { ConcurrencyMode::Auto => token_bucket_size(AUTO_TARGET_THROUGHPUT), ConcurrencyMode::TargetThroughput(target_throughput) => { // TODO - we don't (yet) publicly allow configuring upload/download independently so we // just pick one for now as they must be the same at the moment. - let thrpt = target_throughput.download(); - token_bucket_size(*thrpt) + token_bucket_size(*target_throughput.download()) } ConcurrencyMode::Explicit(concurrency) => *concurrency as u64, - }; - - TokenBucket { - semaphore: Arc::new(Semaphore::new(max_tokens.try_into().unwrap())), - mode, } } + /// Get the maximum number of tokens this bucket can hold + pub(crate) fn max_tokens(&self) -> u64 { + Self::max_tokens_inner(&self.mode) + } + /// Calculate the token cost for the given permit type (and current mode) fn cost(&self, ptype: PermitType) -> u32 { match self.mode { @@ -187,7 +199,15 @@ impl TokenBucket { /// Acquire a token for the given permit type. Tokens are returned to the bucket when the /// [OwnedToken] is dropped. pub(crate) fn acquire(&self, ptype: PermitType) -> AcquireTokenFuture { - AcquireTokenFuture::new(PollSemaphore::new(self.semaphore.clone()), self.cost(ptype)) + self.tb_metrics.record_token_acquisition(); + + AcquireTokenFuture::new( + PollSemaphore::new(self.semaphore.clone()), + self.cost(ptype), + self.tb_metrics.clone(), + self.scheduler_metrics.clone(), + Instant::now(), + ) } pub(crate) fn try_acquire( @@ -195,27 +215,53 @@ impl TokenBucket { ptype: PermitType, ) -> Result, error::Error> { let cost = self.cost(ptype); + match self.semaphore.clone().try_acquire_many_owned(cost) { - Ok(permit) => Ok(Some(OwnedToken::new(permit))), + Ok(permit) => { + self.tb_metrics.record_token_acquisition(); + self.tb_metrics.record_token_wait_time(0.0); + Ok(Some(OwnedToken::new(permit, self.tb_metrics.clone()))) + } Err(TryAcquireError::NoPermits) => Ok(None), Err(err @ TryAcquireError::Closed) => { Err(error::Error::new(error::ErrorKind::RuntimeError, err)) } } } + + /// Get the metrics for this TokenBucket + #[allow(dead_code)] + pub(crate) fn metrics(&self) -> Arc { + self.tb_metrics.clone() + } } pin_project! { #[derive(Debug, Clone)] pub(crate) struct AcquireTokenFuture { sem: PollSemaphore, - tokens: u32 + tokens: u32, + tb_metrics: Arc, + scheduler_metrics: Arc, + start_time: Instant } } impl AcquireTokenFuture { - fn new(sem: PollSemaphore, tokens: u32) -> Self { - Self { sem, tokens } + fn new( + sem: PollSemaphore, + tokens: u32, + tb_metrics: Arc, + scheduler_metrics: Arc, + start_time: Instant, + ) -> Self { + Self { + sem, + tokens, + tb_metrics, + scheduler_metrics, + start_time, + } } } @@ -228,7 +274,11 @@ impl Future for AcquireTokenFuture { ) -> std::task::Poll { let this = self.project(); match this.sem.poll_acquire_many(cx, *this.tokens) { - Poll::Ready(Some(permit)) => Poll::Ready(Ok(OwnedToken::new(permit))), + Poll::Ready(Some(permit)) => { + this.tb_metrics + .record_token_wait_time(this.start_time.elapsed().as_secs_f64()); + Poll::Ready(Ok(OwnedToken::new(permit, this.tb_metrics.clone()))) + } Poll::Ready(None) => Poll::Ready(Err(error::Error::new( error::ErrorKind::RuntimeError, "semaphore closed", @@ -244,11 +294,21 @@ impl Future for AcquireTokenFuture { #[derive(Debug)] pub(crate) struct OwnedToken { _inner: OwnedSemaphorePermit, + tb_metrics: Arc, } impl OwnedToken { - fn new(permit: OwnedSemaphorePermit) -> Self { - OwnedToken { _inner: permit } + fn new(permit: OwnedSemaphorePermit, tb_metrics: Arc) -> Self { + OwnedToken { + _inner: permit, + tb_metrics, + } + } +} + +impl Drop for OwnedToken { + fn drop(&mut self) { + self.tb_metrics.record_token_release(); } }