Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions datafusion/ffi/src/physical_expr/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ pub enum FFI_MetricValue {
name: SString,
gauge: u64,
},
PeakMemoryUsage {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this enum needs to go at the end of the enum, not the middle, per

//! The variant *order* of [`FFI_MetricValue`] is part of the stable ABI and
//! must not be reordered. New variants must be appended at the end.

name: SString,
gauge: u64,
},
Time {
name: SString,
time_ns: u64,
Expand Down Expand Up @@ -425,6 +429,10 @@ impl From<&MetricValue> for FFI_MetricValue {
name: SString::from(name.as_ref()),
gauge: gauge.value() as u64,
},
MetricValue::PeakMemoryUsage { name, gauge } => Self::PeakMemoryUsage {
name: SString::from(name.as_ref()),
gauge: gauge.value() as u64,
},
MetricValue::Time { name, time } => Self::Time {
name: SString::from(name.as_ref()),
time_ns: time.value() as u64,
Expand Down Expand Up @@ -481,6 +489,10 @@ impl From<FFI_MetricValue> for MetricValue {
name: Cow::Owned(name.into()),
gauge: gauge_from_value(gauge),
},
FFI_MetricValue::PeakMemoryUsage { name, gauge } => Self::PeakMemoryUsage {
name: Cow::Owned(name.into()),
gauge: gauge_from_value(gauge),
},
FFI_MetricValue::Time { name, time_ns } => Self::Time {
name: Cow::Owned(name.into()),
time: time_from_nanos(time_ns),
Expand Down Expand Up @@ -624,6 +636,13 @@ mod tests {
gauge,
});

let peak_memory = Gauge::new();
peak_memory.add(44);
assert_value_roundtrip(MetricValue::PeakMemoryUsage {
name: Cow::Borrowed("peak_mem_used"),
gauge: peak_memory,
});

let time = Time::new();
time.add_duration(std::time::Duration::from_nanos(33));
assert_value_roundtrip(MetricValue::Time {
Expand Down
17 changes: 17 additions & 0 deletions datafusion/physical-expr-common/src/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,23 @@ impl<'a> MetricBuilder<'a> {
gauge
}

/// Consumes self and creates a new [`Gauge`] for recording peak memory
/// usage in bytes.
pub fn peak_memory_usage(
self,
gauge_name: impl Into<Cow<'static, str>>,
partition: usize,
) -> Gauge {
let gauge = Gauge::new();
self.with_category(MetricCategory::Bytes)
.with_partition(partition)
.build(MetricValue::PeakMemoryUsage {
name: gauge_name.into(),
gauge: gauge.clone(),
});
gauge
}

/// Consume self and create a new Timer for recording the elapsed
/// CPU time spent by an operator
pub fn elapsed_compute(self, partition: usize) -> Time {
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr-common/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ impl MetricsSet {
MetricValue::SpilledRows(_) => false,
MetricValue::CurrentMemoryUsage(_) => false,
MetricValue::Gauge { name, .. } => name == metric_name,
MetricValue::PeakMemoryUsage { name, .. } => name == metric_name,
MetricValue::StartTimestamp(_) => false,
MetricValue::EndTimestamp(_) => false,
MetricValue::PruningMetrics { name, .. } => name == metric_name,
Expand Down
49 changes: 47 additions & 2 deletions datafusion/physical-expr-common/src/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,13 @@ pub enum MetricValue {
/// The value of the metric
gauge: Gauge,
},
/// Operator defined peak memory usage in bytes.
PeakMemoryUsage {
/// The provided name of this metric
name: Cow<'static, str>,
/// The value of the metric
gauge: Gauge,
},
/// Operator defined time
Time {
/// The provided name of this metric
Expand Down Expand Up @@ -744,6 +751,13 @@ impl PartialEq for MetricValue {
name: other_name,
gauge: other_gauge,
},
)
| (
MetricValue::PeakMemoryUsage { name, gauge },
MetricValue::PeakMemoryUsage {
name: other_name,
gauge: other_gauge,
},
) => name == other_name && gauge == other_gauge,
(
MetricValue::Time { name, time },
Expand Down Expand Up @@ -810,7 +824,9 @@ impl MetricValue {
Self::CurrentMemoryUsage(_) => "mem_used",
Self::ElapsedCompute(_) => "elapsed_compute",
Self::Count { name, .. } => name.borrow(),
Self::Gauge { name, .. } => name.borrow(),
Self::Gauge { name, .. } | Self::PeakMemoryUsage { name, .. } => {
name.borrow()
}
Self::Time { name, .. } => name.borrow(),
Self::StartTimestamp(_) => "start_timestamp",
Self::EndTimestamp(_) => "end_timestamp",
Expand All @@ -833,7 +849,9 @@ impl MetricValue {
Self::CurrentMemoryUsage(used) => used.value(),
Self::ElapsedCompute(time) => time.value(),
Self::Count { count, .. } => count.value(),
Self::Gauge { gauge, .. } => gauge.value(),
Self::Gauge { gauge, .. } | Self::PeakMemoryUsage { gauge, .. } => {
gauge.value()
}
Self::Time { time, .. } => time.value(),
Self::StartTimestamp(timestamp) => timestamp
.value()
Expand Down Expand Up @@ -875,6 +893,10 @@ impl MetricValue {
name: name.clone(),
gauge: Gauge::new(),
},
Self::PeakMemoryUsage { name, .. } => Self::PeakMemoryUsage {
name: name.clone(),
gauge: Gauge::new(),
},
Self::Time { name, .. } => Self::Time {
name: name.clone(),
time: Time::new(),
Expand Down Expand Up @@ -933,6 +955,12 @@ impl MetricValue {
Self::Gauge {
gauge: other_gauge, ..
},
)
| (
Self::PeakMemoryUsage { gauge, .. },
Self::PeakMemoryUsage {
gauge: other_gauge, ..
},
) => gauge.add(other_gauge.value()),
(Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
| (
Expand Down Expand Up @@ -1029,6 +1057,7 @@ impl MetricValue {
"page_index_pages_skipped_by_fully_matched" => 8,
_ => 14,
},
Self::PeakMemoryUsage { .. } => 13,
Self::Gauge { .. } => 15,
Self::Time { .. } => 16,
Self::Ratio { .. } => 17,
Expand Down Expand Up @@ -1064,6 +1093,10 @@ impl Display for MetricValue {
let readable_size = human_readable_size(gauge.value());
write!(f, "{readable_size}")
}
Self::PeakMemoryUsage { gauge, .. } => {
let readable_size = human_readable_size(gauge.value());
write!(f, "{readable_size}")
}
Self::Gauge { gauge, .. } => {
// Generic gauge metrics - format with human-readable count
write!(f, "{}", human_readable_count(gauge.value()))
Expand Down Expand Up @@ -1525,6 +1558,18 @@ mod tests {
"100.0 MB"
);

// Test PeakMemoryUsage formatting (should use size, not count)
let peak_mem_gauge = Gauge::new();
peak_mem_gauge.add(100 * MB as usize);
assert_eq!(
MetricValue::PeakMemoryUsage {
name: "peak_mem_used".into(),
gauge: peak_mem_gauge.clone()
}
.to_string(),
"100.0 MB"
);

// Test custom Gauge formatting (should use count)
let custom_gauge = Gauge::new();
custom_gauge.add(50_000);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,7 @@ impl GroupedHashAggregateStream {
merging_aggregate_arguments,
merging_group_by: PhysicalGroupBy::new_single(merging_group_by_expr),
peak_mem_used: MetricBuilder::new(&agg.metrics)
.with_category(MetricCategory::Bytes)
.gauge("peak_mem_used", partition),
.peak_memory_usage("peak_mem_used", partition),
spill_manager,
};

Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ impl ExecutionPlan for BufferExec {
let curr_mem_out = Arc::clone(&curr_mem_in);
let mut max_mem_in = 0;
let max_mem = MetricBuilder::new(&self.metrics)
.with_category(MetricCategory::Bytes)
.gauge("max_mem_used", partition);
.peak_memory_usage("max_mem_used", partition);

let curr_queued_in = Arc::new(AtomicUsize::new(0));
let curr_queued_out = Arc::clone(&curr_queued_in);
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,9 @@ impl PgJsonExecutionPlanVisitor<'_> {
}
MetricValue::Count { count, .. } => serde_json::Value::from(count.value()),
MetricValue::Gauge { gauge, .. } => serde_json::Value::from(gauge.value()),
MetricValue::PeakMemoryUsage { gauge, .. } => {
serde_json::Value::from(gauge.value())
}
MetricValue::Time { time, .. } => {
let ms = (time.value() as f64) / 1_000_000.0;
serde_json::Value::from(ms)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ impl BitwiseSortMergeJoinStream {
MetricBuilder::new(metrics).counter("input_batches", partition);
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
let baseline_metrics = BaselineMetrics::new(metrics, partition);
let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition);
let peak_mem_used =
MetricBuilder::new(metrics).peak_memory_usage("peak_mem_used", partition);

Ok(Self {
join_type,
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ impl SortMergeJoinMetrics {
let input_rows = MetricBuilder::new(metrics)
.with_category(MetricCategory::Rows)
.counter("input_rows", partition);
let peak_mem_used = MetricBuilder::new(metrics)
.with_category(MetricCategory::Bytes)
.gauge("peak_mem_used", partition);
let peak_mem_used =
MetricBuilder::new(metrics).peak_memory_usage("peak_mem_used", partition);

let baseline_metrics = BaselineMetrics::new(metrics, partition);

Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1798,9 +1798,8 @@ impl BuildProbeJoinMetrics {
.with_category(MetricCategory::Rows)
.counter("build_input_rows", partition);

let build_mem_used = MetricBuilder::new(metrics)
.with_category(MetricCategory::Bytes)
.gauge("build_mem_used", partition);
let build_mem_used =
MetricBuilder::new(metrics).peak_memory_usage("build_mem_used", partition);

let input_batches = MetricBuilder::new(metrics)
.with_category(MetricCategory::Rows)
Expand Down
Loading
Loading