diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index b579257af313..9252e0688c35 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -16,7 +16,8 @@ // under the License. use std::{ - fmt, + cmp, fmt, + ops::AddAssign, str::FromStr, sync::{ atomic::{AtomicU8, Ordering}, @@ -28,7 +29,7 @@ use std::{ use async_trait::async_trait; use chrono::Utc; use datafusion::{ - common::instant::Instant, + common::{instant::Instant, HashMap}, error::DataFusionError, execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, }; @@ -201,8 +202,9 @@ impl ObjectStore for InstrumentedObjectStore { } } +/// Object store operation types tracked by [`InstrumentedObjectStore`] #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -enum Operation { +pub enum Operation { _Copy, _Delete, Get, @@ -250,6 +252,103 @@ impl fmt::Display for RequestDetails { } } +/// Summary statistics for an [`InstrumentedObjectStore`]'s [`RequestDetails`] +#[derive(Default)] +pub struct RequestSummary { + count: usize, + duration_stats: Option>, + size_stats: Option>, +} + +impl RequestSummary { + /// Generates a set of [RequestSummaries](RequestSummary) from the input [`RequestDetails`] + /// grouped by the input's [`Operation`] + pub fn summarize_by_operation( + requests: &[RequestDetails], + ) -> HashMap { + let mut summaries: HashMap = HashMap::new(); + for rd in requests { + match summaries.get_mut(&rd.op) { + Some(rs) => rs.push(rd), + None => { + let mut rs = RequestSummary::default(); + rs.push(rd); + summaries.insert(rd.op, rs); + } + } + } + + summaries + } + + fn push(&mut self, request: &RequestDetails) { + self.count += 1; + if let Some(dur) = request.duration { + self.duration_stats.get_or_insert_default().push(dur) + } + if let Some(size) = request.size { + self.size_stats.get_or_insert_default().push(size) + } + } +} + +impl fmt::Display for RequestSummary { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "count: {}", self.count)?; + + if let Some(dur_stats) = &self.duration_stats { + writeln!(f, "duration min: {:.6}s", dur_stats.min.as_secs_f32())?; + writeln!(f, "duration max: {:.6}s", dur_stats.max.as_secs_f32())?; + let avg = dur_stats.sum.as_secs_f32() / (self.count as f32); + writeln!(f, "duration avg: {:.6}s", avg)?; + } + + if let Some(size_stats) = &self.size_stats { + writeln!(f, "size min: {} B", size_stats.min)?; + writeln!(f, "size max: {} B", size_stats.max)?; + let avg = size_stats.sum / self.count; + writeln!(f, "size avg: {} B", avg)?; + writeln!(f, "size sum: {} B", size_stats.sum)?; + } + + Ok(()) + } +} + +struct Stats> { + min: T, + max: T, + sum: T, +} + +impl> Stats { + fn push(&mut self, val: T) { + self.min = cmp::min(val, self.min); + self.max = cmp::max(val, self.max); + self.sum += val; + } +} + +impl Default for Stats { + fn default() -> Self { + Self { + min: Duration::MAX, + max: Duration::ZERO, + sum: Duration::ZERO, + } + } +} + +impl Default for Stats { + fn default() -> Self { + Self { + min: usize::MAX, + max: usize::MIN, + sum: 0, + } + } +} + /// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting #[derive(Debug)] pub struct InstrumentedObjectStoreRegistry { @@ -420,4 +519,158 @@ mod tests { "1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info" ); } + + #[test] + fn request_summary() { + // Test empty request list + let mut requests = Vec::new(); + let summaries = RequestSummary::summarize_by_operation(&requests); + assert!(summaries.is_empty()); + + requests.push(RequestDetails { + op: Operation::Get, + path: Path::from("test1"), + timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(), + duration: Some(Duration::from_secs(5)), + size: Some(100), + range: None, + extra_display: None, + }); + + let summaries = RequestSummary::summarize_by_operation(&requests); + assert_eq!(summaries.len(), 1); + + let summary = summaries.get(&Operation::Get).unwrap(); + assert_eq!(summary.count, 1); + assert_eq!( + summary.duration_stats.as_ref().unwrap().min, + Duration::from_secs(5) + ); + assert_eq!( + summary.duration_stats.as_ref().unwrap().max, + Duration::from_secs(5) + ); + assert_eq!( + summary.duration_stats.as_ref().unwrap().sum, + Duration::from_secs(5) + ); + assert_eq!(summary.size_stats.as_ref().unwrap().min, 100); + assert_eq!(summary.size_stats.as_ref().unwrap().max, 100); + assert_eq!(summary.size_stats.as_ref().unwrap().sum, 100); + + // Add more Get requests to test aggregation + requests.push(RequestDetails { + op: Operation::Get, + path: Path::from("test2"), + timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(), + duration: Some(Duration::from_secs(8)), + size: Some(150), + range: None, + extra_display: None, + }); + requests.push(RequestDetails { + op: Operation::Get, + path: Path::from("test3"), + timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(), + duration: Some(Duration::from_secs(2)), + size: Some(50), + range: None, + extra_display: None, + }); + + let summaries = RequestSummary::summarize_by_operation(&requests); + assert_eq!(summaries.len(), 1); + + let summary = summaries.get(&Operation::Get).unwrap(); + assert_eq!(summary.count, 3); + assert_eq!( + summary.duration_stats.as_ref().unwrap().min, + Duration::from_secs(2) + ); + assert_eq!( + summary.duration_stats.as_ref().unwrap().max, + Duration::from_secs(8) + ); + assert_eq!( + summary.duration_stats.as_ref().unwrap().sum, + Duration::from_secs(15) + ); + assert_eq!(summary.size_stats.as_ref().unwrap().min, 50); + assert_eq!(summary.size_stats.as_ref().unwrap().max, 150); + assert_eq!(summary.size_stats.as_ref().unwrap().sum, 300); + + // Add Put requests to test grouping + requests.push(RequestDetails { + op: Operation::_Put, + path: Path::from("test4"), + timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(), + duration: Some(Duration::from_millis(200)), + size: Some(75), + range: None, + extra_display: None, + }); + + let summaries = RequestSummary::summarize_by_operation(&requests); + assert_eq!(summaries.len(), 2); + + let get_summary = summaries.get(&Operation::Get).unwrap(); + assert_eq!(get_summary.count, 3); + + let put_summary = summaries.get(&Operation::_Put).unwrap(); + assert_eq!(put_summary.count, 1); + assert_eq!( + put_summary.duration_stats.as_ref().unwrap().min, + Duration::from_millis(200) + ); + assert_eq!(put_summary.size_stats.as_ref().unwrap().sum, 75); + + // Test request with only duration (no size) + let only_duration = vec![RequestDetails { + op: Operation::Get, + path: Path::from("test1"), + timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(), + duration: Some(Duration::from_secs(3)), + size: None, + range: None, + extra_display: None, + }]; + let summaries = RequestSummary::summarize_by_operation(&only_duration); + let summary = summaries.get(&Operation::Get).unwrap(); + assert_eq!(summary.count, 1); + assert!(summary.duration_stats.is_some()); + assert!(summary.size_stats.is_none()); + + // Test request with only size (no duration) + let only_size = vec![RequestDetails { + op: Operation::Get, + path: Path::from("test1"), + timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(), + duration: None, + size: Some(200), + range: None, + extra_display: None, + }]; + let summaries = RequestSummary::summarize_by_operation(&only_size); + let summary = summaries.get(&Operation::Get).unwrap(); + assert_eq!(summary.count, 1); + assert!(summary.duration_stats.is_none()); + assert!(summary.size_stats.is_some()); + assert_eq!(summary.size_stats.as_ref().unwrap().sum, 200); + + // Test request with neither duration nor size + let no_stats = vec![RequestDetails { + op: Operation::Get, + path: Path::from("test1"), + timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(), + duration: None, + size: None, + range: None, + extra_display: None, + }]; + let summaries = RequestSummary::summarize_by_operation(&no_stats); + let summary = summaries.get(&Operation::Get).unwrap(); + assert_eq!(summary.count, 1); + assert!(summary.duration_stats.is_none()); + assert!(summary.size_stats.is_none()); + } } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 0d5e1e3e6fa9..f54de189b4ef 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -22,7 +22,7 @@ use std::str::FromStr; use std::sync::Arc; use crate::object_storage::instrumented::{ - InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummary, }; use crate::print_format::PrintFormat; @@ -202,6 +202,13 @@ impl PrintOptions { } // Add an extra blank line to help visually organize the output writeln!(writer)?; + + writeln!(writer, "Summaries:")?; + let summaries = RequestSummary::summarize_by_operation(&requests); + for (op, summary) in summaries { + writeln!(writer, "{op:?}")?; + writeln!(writer, "{summary}")?; + } } } } diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index cdb442c47f49..a67924fef253 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -416,6 +416,15 @@ async fn test_object_store_profiling() { " operation=$1 duration=[DURATION] size=$2 path=$3", ); + // We also need to filter out the durations reported in the summary output + // + // Example line(s) to filter: + // + // duration min: 0.000729s + // duration max: 0.000729s + // duration avg: 0.000729s + settings.add_filter(r"duration (min|max|avg): \d+\.\d{6}s", "[SUMMARY_DURATION]"); + let _bound = settings.bind_to_scope(); let input = r#" diff --git a/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap b/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap index 921efa7a1c9e..50c6cc8eab99 100644 --- a/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap +++ b/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap @@ -39,6 +39,17 @@ Object Store Profiling Instrumented Object Store: instrument_mode: Enabled, inner: AmazonS3(data) operation=Get duration=[DURATION] size=1006 path=cars.csv +Summaries: +Get +count: 1 +[SUMMARY_DURATION] +[SUMMARY_DURATION] +[SUMMARY_DURATION] +size min: 1006 B +size max: 1006 B +size avg: 1006 B +size sum: 1006 B + ObjectStore Profile mode set to Disabled +-----+-------+---------------------+ | car | speed | time |