Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 256 additions & 3 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use std::{
fmt,
cmp, fmt,
ops::AddAssign,
str::FromStr,
sync::{
atomic::{AtomicU8, Ordering},
Expand All @@ -28,7 +29,7 @@ use std::{
use async_trait::async_trait;
use chrono::Utc;
use datafusion::{
common::instant::Instant,
common::{instant::Instant, HashMap},
error::DataFusionError,
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
Expand Down Expand Up @@ -201,8 +202,9 @@ impl ObjectStore for InstrumentedObjectStore {
}
}

/// Object store operation types tracked by [`InstrumentedObjectStore`]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
enum Operation {
pub enum Operation {
_Copy,
_Delete,
Get,
Expand Down Expand Up @@ -250,6 +252,103 @@ impl fmt::Display for RequestDetails {
}
}

/// Summary statistics for an [`InstrumentedObjectStore`]'s [`RequestDetails`]
#[derive(Default)]
pub struct RequestSummary {
count: usize,
duration_stats: Option<Stats<Duration>>,
size_stats: Option<Stats<usize>>,
}

impl RequestSummary {
/// Generates a set of [RequestSummaries](RequestSummary) from the input [`RequestDetails`]
/// grouped by the input's [`Operation`]
pub fn summarize_by_operation(
requests: &[RequestDetails],
) -> HashMap<Operation, Self> {
let mut summaries: HashMap<Operation, Self> = HashMap::new();
for rd in requests {
match summaries.get_mut(&rd.op) {
Some(rs) => rs.push(rd),
None => {
let mut rs = RequestSummary::default();
rs.push(rd);
summaries.insert(rd.op, rs);
}
}
}

summaries
}

fn push(&mut self, request: &RequestDetails) {
self.count += 1;
if let Some(dur) = request.duration {
self.duration_stats.get_or_insert_default().push(dur)
}
if let Some(size) = request.size {
self.size_stats.get_or_insert_default().push(size)
}
}
}

impl fmt::Display for RequestSummary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "count: {}", self.count)?;

if let Some(dur_stats) = &self.duration_stats {
writeln!(f, "duration min: {:.6}s", dur_stats.min.as_secs_f32())?;
writeln!(f, "duration max: {:.6}s", dur_stats.max.as_secs_f32())?;
let avg = dur_stats.sum.as_secs_f32() / (self.count as f32);
writeln!(f, "duration avg: {:.6}s", avg)?;
}

if let Some(size_stats) = &self.size_stats {
writeln!(f, "size min: {} B", size_stats.min)?;
writeln!(f, "size max: {} B", size_stats.max)?;
let avg = size_stats.sum / self.count;
writeln!(f, "size avg: {} B", avg)?;
writeln!(f, "size sum: {} B", size_stats.sum)?;
}

Ok(())
}
}

struct Stats<T: Copy + Ord + AddAssign<T>> {
min: T,
max: T,
sum: T,
}

impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
fn push(&mut self, val: T) {
self.min = cmp::min(val, self.min);
self.max = cmp::max(val, self.max);
self.sum += val;
}
}

impl Default for Stats<Duration> {
fn default() -> Self {
Self {
min: Duration::MAX,
max: Duration::ZERO,
sum: Duration::ZERO,
}
}
}

impl Default for Stats<usize> {
fn default() -> Self {
Self {
min: usize::MAX,
max: usize::MIN,
sum: 0,
}
}
}

/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
#[derive(Debug)]
pub struct InstrumentedObjectStoreRegistry {
Expand Down Expand Up @@ -420,4 +519,158 @@ mod tests {
"1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
);
}

#[test]
fn request_summary() {
// Test empty request list
let mut requests = Vec::new();
let summaries = RequestSummary::summarize_by_operation(&requests);
assert!(summaries.is_empty());

requests.push(RequestDetails {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: Some(Duration::from_secs(5)),
size: Some(100),
range: None,
extra_display: None,
});

let summaries = RequestSummary::summarize_by_operation(&requests);
assert_eq!(summaries.len(), 1);

let summary = summaries.get(&Operation::Get).unwrap();
assert_eq!(summary.count, 1);
assert_eq!(
summary.duration_stats.as_ref().unwrap().min,
Duration::from_secs(5)
);
assert_eq!(
summary.duration_stats.as_ref().unwrap().max,
Duration::from_secs(5)
);
assert_eq!(
summary.duration_stats.as_ref().unwrap().sum,
Duration::from_secs(5)
);
assert_eq!(summary.size_stats.as_ref().unwrap().min, 100);
assert_eq!(summary.size_stats.as_ref().unwrap().max, 100);
assert_eq!(summary.size_stats.as_ref().unwrap().sum, 100);

// Add more Get requests to test aggregation
requests.push(RequestDetails {
op: Operation::Get,
path: Path::from("test2"),
timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
duration: Some(Duration::from_secs(8)),
size: Some(150),
range: None,
extra_display: None,
});
requests.push(RequestDetails {
op: Operation::Get,
path: Path::from("test3"),
timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
duration: Some(Duration::from_secs(2)),
size: Some(50),
range: None,
extra_display: None,
});

let summaries = RequestSummary::summarize_by_operation(&requests);
assert_eq!(summaries.len(), 1);

let summary = summaries.get(&Operation::Get).unwrap();
assert_eq!(summary.count, 3);
assert_eq!(
summary.duration_stats.as_ref().unwrap().min,
Duration::from_secs(2)
);
assert_eq!(
summary.duration_stats.as_ref().unwrap().max,
Duration::from_secs(8)
);
assert_eq!(
summary.duration_stats.as_ref().unwrap().sum,
Duration::from_secs(15)
);
assert_eq!(summary.size_stats.as_ref().unwrap().min, 50);
assert_eq!(summary.size_stats.as_ref().unwrap().max, 150);
assert_eq!(summary.size_stats.as_ref().unwrap().sum, 300);

// Add Put requests to test grouping
requests.push(RequestDetails {
op: Operation::_Put,
path: Path::from("test4"),
timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
duration: Some(Duration::from_millis(200)),
size: Some(75),
range: None,
extra_display: None,
});

let summaries = RequestSummary::summarize_by_operation(&requests);
assert_eq!(summaries.len(), 2);

let get_summary = summaries.get(&Operation::Get).unwrap();
assert_eq!(get_summary.count, 3);

let put_summary = summaries.get(&Operation::_Put).unwrap();
assert_eq!(put_summary.count, 1);
assert_eq!(
put_summary.duration_stats.as_ref().unwrap().min,
Duration::from_millis(200)
);
assert_eq!(put_summary.size_stats.as_ref().unwrap().sum, 75);

// Test request with only duration (no size)
let only_duration = vec![RequestDetails {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: Some(Duration::from_secs(3)),
size: None,
range: None,
extra_display: None,
}];
let summaries = RequestSummary::summarize_by_operation(&only_duration);
let summary = summaries.get(&Operation::Get).unwrap();
assert_eq!(summary.count, 1);
assert!(summary.duration_stats.is_some());
assert!(summary.size_stats.is_none());

// Test request with only size (no duration)
let only_size = vec![RequestDetails {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: None,
size: Some(200),
range: None,
extra_display: None,
}];
let summaries = RequestSummary::summarize_by_operation(&only_size);
let summary = summaries.get(&Operation::Get).unwrap();
assert_eq!(summary.count, 1);
assert!(summary.duration_stats.is_none());
assert!(summary.size_stats.is_some());
assert_eq!(summary.size_stats.as_ref().unwrap().sum, 200);

// Test request with neither duration nor size
let no_stats = vec![RequestDetails {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
duration: None,
size: None,
range: None,
extra_display: None,
}];
let summaries = RequestSummary::summarize_by_operation(&no_stats);
let summary = summaries.get(&Operation::Get).unwrap();
assert_eq!(summary.count, 1);
assert!(summary.duration_stats.is_none());
assert!(summary.size_stats.is_none());
}
}
9 changes: 8 additions & 1 deletion datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::str::FromStr;
use std::sync::Arc;

use crate::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummary,
};
use crate::print_format::PrintFormat;

Expand Down Expand Up @@ -202,6 +202,13 @@ impl PrintOptions {
}
// Add an extra blank line to help visually organize the output
writeln!(writer)?;

writeln!(writer, "Summaries:")?;
let summaries = RequestSummary::summarize_by_operation(&requests);
for (op, summary) in summaries {
writeln!(writer, "{op:?}")?;
writeln!(writer, "{summary}")?;
}
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,15 @@ async fn test_object_store_profiling() {
"<TIMESTAMP> operation=$1 duration=[DURATION] size=$2 path=$3",
);

// We also need to filter out the durations reported in the summary output
//
// Example line(s) to filter:
//
// duration min: 0.000729s
// duration max: 0.000729s
// duration avg: 0.000729s
settings.add_filter(r"duration (min|max|avg): \d+\.\d{6}s", "[SUMMARY_DURATION]");

let _bound = settings.bind_to_scope();

let input = r#"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ Object Store Profiling
Instrumented Object Store: instrument_mode: Enabled, inner: AmazonS3(data)
<TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv

Summaries:
Get
count: 1
[SUMMARY_DURATION]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a nice to have future feature, it would be great to have these values be more human readable -- e.g. in stead of 17446363 B have it be 17.4 MB

[SUMMARY_DURATION]
[SUMMARY_DURATION]
size min: 1006 B
size max: 1006 B
size avg: 1006 B
size sum: 1006 B

ObjectStore Profile mode set to Disabled
+-----+-------+---------------------+
| car | speed | time |
Expand Down