From 833a91ad247e94d3af0e8a9ebf78ecf1d15ae5c8 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Wed, 24 Jun 2026 10:45:51 +0800 Subject: [PATCH 1/2] fix: Fix memory display in `EXPLAIN ANALYZE` for multiple operators --- datafusion/ffi/src/physical_expr/metrics.rs | 19 ++ .../src/metrics/builder.rs | 17 ++ .../physical-expr-common/src/metrics/mod.rs | 1 + .../physical-expr-common/src/metrics/value.rs | 49 +++- .../physical-plan/src/aggregates/row_hash.rs | 3 +- datafusion/physical-plan/src/buffer.rs | 3 +- datafusion/physical-plan/src/display.rs | 3 + .../joins/sort_merge_join/bitwise_stream.rs | 3 +- .../src/joins/sort_merge_join/metrics.rs | 5 +- datafusion/physical-plan/src/joins/utils.rs | 5 +- .../test_files/explain_analyze.slt | 231 ++++++++++++++++++ datafusion/sqllogictest/test_files/joins.slt | 2 +- 12 files changed, 327 insertions(+), 14 deletions(-) diff --git a/datafusion/ffi/src/physical_expr/metrics.rs b/datafusion/ffi/src/physical_expr/metrics.rs index ebef728e0520d..f6323a0ff98aa 100644 --- a/datafusion/ffi/src/physical_expr/metrics.rs +++ b/datafusion/ffi/src/physical_expr/metrics.rs @@ -147,6 +147,10 @@ pub enum FFI_MetricValue { name: SString, gauge: u64, }, + PeakMemoryUsage { + name: SString, + gauge: u64, + }, Time { name: SString, time_ns: u64, @@ -425,6 +429,10 @@ impl From<&MetricValue> for FFI_MetricValue { name: SString::from(name.as_ref()), gauge: gauge.value() as u64, }, + MetricValue::PeakMemoryUsage { name, gauge } => Self::PeakMemoryUsage { + name: SString::from(name.as_ref()), + gauge: gauge.value() as u64, + }, MetricValue::Time { name, time } => Self::Time { name: SString::from(name.as_ref()), time_ns: time.value() as u64, @@ -481,6 +489,10 @@ impl From for MetricValue { name: Cow::Owned(name.into()), gauge: gauge_from_value(gauge), }, + FFI_MetricValue::PeakMemoryUsage { name, gauge } => Self::PeakMemoryUsage { + name: Cow::Owned(name.into()), + gauge: gauge_from_value(gauge), + }, FFI_MetricValue::Time { name, time_ns } => Self::Time { name: Cow::Owned(name.into()), time: time_from_nanos(time_ns), @@ -624,6 +636,13 @@ mod tests { gauge, }); + let peak_memory = Gauge::new(); + peak_memory.add(44); + assert_value_roundtrip(MetricValue::PeakMemoryUsage { + name: Cow::Borrowed("peak_mem_used"), + gauge: peak_memory, + }); + let time = Time::new(); time.add_duration(std::time::Duration::from_nanos(33)); assert_value_roundtrip(MetricValue::Time { diff --git a/datafusion/physical-expr-common/src/metrics/builder.rs b/datafusion/physical-expr-common/src/metrics/builder.rs index de9d1e03d88df..7d5a18f535369 100644 --- a/datafusion/physical-expr-common/src/metrics/builder.rs +++ b/datafusion/physical-expr-common/src/metrics/builder.rs @@ -249,6 +249,23 @@ impl<'a> MetricBuilder<'a> { gauge } + /// Consumes self and creates a new [`Gauge`] for recording peak memory + /// usage in bytes. + pub fn peak_memory_usage( + self, + gauge_name: impl Into>, + partition: usize, + ) -> Gauge { + let gauge = Gauge::new(); + self.with_category(MetricCategory::Bytes) + .with_partition(partition) + .build(MetricValue::PeakMemoryUsage { + name: gauge_name.into(), + gauge: gauge.clone(), + }); + gauge + } + /// Consume self and create a new Timer for recording the elapsed /// CPU time spent by an operator pub fn elapsed_compute(self, partition: usize) -> Time { diff --git a/datafusion/physical-expr-common/src/metrics/mod.rs b/datafusion/physical-expr-common/src/metrics/mod.rs index 0a03075b91094..d6048a0fcd338 100644 --- a/datafusion/physical-expr-common/src/metrics/mod.rs +++ b/datafusion/physical-expr-common/src/metrics/mod.rs @@ -309,6 +309,7 @@ impl MetricsSet { MetricValue::SpilledRows(_) => false, MetricValue::CurrentMemoryUsage(_) => false, MetricValue::Gauge { name, .. } => name == metric_name, + MetricValue::PeakMemoryUsage { name, .. } => name == metric_name, MetricValue::StartTimestamp(_) => false, MetricValue::EndTimestamp(_) => false, MetricValue::PruningMetrics { name, .. } => name == metric_name, diff --git a/datafusion/physical-expr-common/src/metrics/value.rs b/datafusion/physical-expr-common/src/metrics/value.rs index ef0087c20d91f..232fefcc5f47e 100644 --- a/datafusion/physical-expr-common/src/metrics/value.rs +++ b/datafusion/physical-expr-common/src/metrics/value.rs @@ -672,6 +672,13 @@ pub enum MetricValue { /// The value of the metric gauge: Gauge, }, + /// Operator defined peak memory usage in bytes. + PeakMemoryUsage { + /// The provided name of this metric + name: Cow<'static, str>, + /// The value of the metric + gauge: Gauge, + }, /// Operator defined time Time { /// The provided name of this metric @@ -744,6 +751,13 @@ impl PartialEq for MetricValue { name: other_name, gauge: other_gauge, }, + ) + | ( + MetricValue::PeakMemoryUsage { name, gauge }, + MetricValue::PeakMemoryUsage { + name: other_name, + gauge: other_gauge, + }, ) => name == other_name && gauge == other_gauge, ( MetricValue::Time { name, time }, @@ -810,7 +824,9 @@ impl MetricValue { Self::CurrentMemoryUsage(_) => "mem_used", Self::ElapsedCompute(_) => "elapsed_compute", Self::Count { name, .. } => name.borrow(), - Self::Gauge { name, .. } => name.borrow(), + Self::Gauge { name, .. } | Self::PeakMemoryUsage { name, .. } => { + name.borrow() + } Self::Time { name, .. } => name.borrow(), Self::StartTimestamp(_) => "start_timestamp", Self::EndTimestamp(_) => "end_timestamp", @@ -833,7 +849,9 @@ impl MetricValue { Self::CurrentMemoryUsage(used) => used.value(), Self::ElapsedCompute(time) => time.value(), Self::Count { count, .. } => count.value(), - Self::Gauge { gauge, .. } => gauge.value(), + Self::Gauge { gauge, .. } | Self::PeakMemoryUsage { gauge, .. } => { + gauge.value() + } Self::Time { time, .. } => time.value(), Self::StartTimestamp(timestamp) => timestamp .value() @@ -875,6 +893,10 @@ impl MetricValue { name: name.clone(), gauge: Gauge::new(), }, + Self::PeakMemoryUsage { name, .. } => Self::PeakMemoryUsage { + name: name.clone(), + gauge: Gauge::new(), + }, Self::Time { name, .. } => Self::Time { name: name.clone(), time: Time::new(), @@ -933,6 +955,12 @@ impl MetricValue { Self::Gauge { gauge: other_gauge, .. }, + ) + | ( + Self::PeakMemoryUsage { gauge, .. }, + Self::PeakMemoryUsage { + gauge: other_gauge, .. + }, ) => gauge.add(other_gauge.value()), (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time)) | ( @@ -1029,6 +1057,7 @@ impl MetricValue { "page_index_pages_skipped_by_fully_matched" => 8, _ => 14, }, + Self::PeakMemoryUsage { .. } => 13, Self::Gauge { .. } => 15, Self::Time { .. } => 16, Self::Ratio { .. } => 17, @@ -1064,6 +1093,10 @@ impl Display for MetricValue { let readable_size = human_readable_size(gauge.value()); write!(f, "{readable_size}") } + Self::PeakMemoryUsage { gauge, .. } => { + let readable_size = human_readable_size(gauge.value()); + write!(f, "{readable_size}") + } Self::Gauge { gauge, .. } => { // Generic gauge metrics - format with human-readable count write!(f, "{}", human_readable_count(gauge.value())) @@ -1525,6 +1558,18 @@ mod tests { "100.0 MB" ); + // Test PeakMemoryUsage formatting (should use size, not count) + let peak_mem_gauge = Gauge::new(); + peak_mem_gauge.add(100 * MB as usize); + assert_eq!( + MetricValue::PeakMemoryUsage { + name: "peak_mem_used".into(), + gauge: peak_mem_gauge.clone() + } + .to_string(), + "100.0 MB" + ); + // Test custom Gauge formatting (should use count) let custom_gauge = Gauge::new(); custom_gauge.add(50_000); diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index d46faf9acc14a..5cd7f508af4af 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -527,8 +527,7 @@ impl GroupedHashAggregateStream { merging_aggregate_arguments, merging_group_by: PhysicalGroupBy::new_single(merging_group_by_expr), peak_mem_used: MetricBuilder::new(&agg.metrics) - .with_category(MetricCategory::Bytes) - .gauge("peak_mem_used", partition), + .peak_memory_usage("peak_mem_used", partition), spill_manager, }; diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 871d3c4d3fc8a..6d1fb69635fb1 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -194,8 +194,7 @@ impl ExecutionPlan for BufferExec { let curr_mem_out = Arc::clone(&curr_mem_in); let mut max_mem_in = 0; let max_mem = MetricBuilder::new(&self.metrics) - .with_category(MetricCategory::Bytes) - .gauge("max_mem_used", partition); + .peak_memory_usage("max_mem_used", partition); let curr_queued_in = Arc::new(AtomicUsize::new(0)); let curr_queued_out = Arc::clone(&curr_queued_in); diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 164637f760286..56b209d921622 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -775,6 +775,9 @@ impl PgJsonExecutionPlanVisitor<'_> { } MetricValue::Count { count, .. } => serde_json::Value::from(count.value()), MetricValue::Gauge { gauge, .. } => serde_json::Value::from(gauge.value()), + MetricValue::PeakMemoryUsage { gauge, .. } => { + serde_json::Value::from(gauge.value()) + } MetricValue::Time { time, .. } => { let ms = (time.value() as f64) / 1_000_000.0; serde_json::Value::from(ms) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs index 99aef6ed82a36..d98b9d1431762 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs @@ -360,7 +360,8 @@ impl BitwiseSortMergeJoinStream { MetricBuilder::new(metrics).counter("input_batches", partition); let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); let baseline_metrics = BaselineMetrics::new(metrics, partition); - let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition); + let peak_mem_used = + MetricBuilder::new(metrics).peak_memory_usage("peak_mem_used", partition); Ok(Self { join_type, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs index 62efb77f877ab..6f52a2234b3dc 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs @@ -46,9 +46,8 @@ impl SortMergeJoinMetrics { let input_rows = MetricBuilder::new(metrics) .with_category(MetricCategory::Rows) .counter("input_rows", partition); - let peak_mem_used = MetricBuilder::new(metrics) - .with_category(MetricCategory::Bytes) - .gauge("peak_mem_used", partition); + let peak_mem_used = + MetricBuilder::new(metrics).peak_memory_usage("peak_mem_used", partition); let baseline_metrics = BaselineMetrics::new(metrics, partition); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 7ecace6b0e530..39a4c178ca4b6 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1798,9 +1798,8 @@ impl BuildProbeJoinMetrics { .with_category(MetricCategory::Rows) .counter("build_input_rows", partition); - let build_mem_used = MetricBuilder::new(metrics) - .with_category(MetricCategory::Bytes) - .gauge("build_mem_used", partition); + let build_mem_used = + MetricBuilder::new(metrics).peak_memory_usage("build_mem_used", partition); let input_batches = MetricBuilder::new(metrics) .with_category(MetricCategory::Rows) diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt index 623580fce94e3..665c8d7f440fb 100644 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -300,6 +300,237 @@ reset datafusion.explain.analyze_categories; statement ok reset datafusion.explain.analyze_level; +# ------------------------------------------------ +# Test memory metrics display. +# ------------------------------------------------ + +statement ok +set datafusion.explain.analyze_level = dev; + +statement ok +set datafusion.explain.analyze_categories = 'bytes'; + +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +set datafusion.optimizer.repartition_joins = false; + +statement ok +set datafusion.optimizer.prefer_hash_join = true; + +statement ok +set datafusion.optimizer.enable_piecewise_merge_join = false; + +statement ok +set datafusion.optimizer.hash_join_inlist_pushdown_max_size = 0; + +statement ok +set datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values = 0; + +query TT +EXPLAIN ANALYZE +WITH t1 (k) AS ( + VALUES (1), (2), (3) +), t2 (k) AS ( + VALUES (1), (2), (3) +) +SELECT * +FROM t1 +JOIN t2 ON t1.k = t2.k; +---- +Plan with Metrics +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], metrics=[output_bytes=128.0 KB, build_mem_used=44.0 B] +02)--ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +03)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] +04)--ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +05)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + +statement ok +set datafusion.execution.hash_join_buffering_capacity = 1024; + +query TT +EXPLAIN ANALYZE +WITH t1 (k) AS ( + VALUES (1), (2), (3) +), t2 (k) AS ( + VALUES (1), (2), (3) +) +SELECT * +FROM t1 +JOIN t2 ON t1.k = t2.k; +---- +Plan with Metrics +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], metrics=[output_bytes=128.0 KB, build_mem_used=44.0 B] +02)--ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +03)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] +04)--BufferExec: capacity=1024, metrics=[max_mem_used=128.0 B] +05)----ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +06)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + +statement ok +reset datafusion.execution.hash_join_buffering_capacity; + +query TT +EXPLAIN ANALYZE +WITH t1 (k) AS ( + VALUES (1), (2) +), t2 (k) AS ( + VALUES (10), (20) +) +SELECT * +FROM t1 +CROSS JOIN t2; +---- +Plan with Metrics +01)CrossJoinExec, metrics=[output_bytes=96.0 B, build_mem_used=128.0 B] +02)--ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +03)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] +04)--ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +05)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + +query TT +EXPLAIN ANALYZE +WITH t1 (k) AS ( + VALUES (1), (2) +), t2 (k) AS ( + VALUES (2), (3) +) +SELECT * +FROM t1 +JOIN t2 ON t1.k < t2.k; +---- +Plan with Metrics +01)NestedLoopJoinExec: join_type=Inner, filter=k@0 < k@1, metrics=[output_bytes=128.0 KB, spilled_bytes=0.0 B, build_mem_used=128.0 B] +02)--ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +03)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] +04)--ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +05)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + +statement ok +set datafusion.optimizer.enable_piecewise_merge_join = true; + +query TT +EXPLAIN ANALYZE +WITH t1 (k) AS ( + VALUES (3), (4) +), t2 (k) AS ( + VALUES (1), (2) +) +SELECT * +FROM t1 +JOIN t2 ON t1.k > t2.k; +---- +Plan with Metrics +01)PiecewiseMergeJoin: operator=Gt, join_type=Inner, on=(k > k), metrics=[output_bytes=0.0 B, build_mem_used=144.0 B] +02)--SortExec: expr=[k@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +03)----ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] +05)--ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +06)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + +statement ok +set datafusion.optimizer.enable_piecewise_merge_join = false; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.optimizer.repartition_joins = true; + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +statement ok +CREATE TABLE ea_smj_t1(a text, b int) AS VALUES ('Alice', 50), ('Alice', 100), ('Bob', 1); + +statement ok +CREATE TABLE ea_smj_t2(a text, b int) AS VALUES ('Alice', 2), ('Alice', 1); + +query TT +EXPLAIN ANALYZE +SELECT ea_smj_t1.a, ea_smj_t1.b, ea_smj_t2.a, ea_smj_t2.b +FROM ea_smj_t1 +JOIN ea_smj_t2 ON ea_smj_t1.a = ea_smj_t2.a + AND ea_smj_t2.b * 50 <= ea_smj_t1.b; +---- +Plan with Metrics +01)SortMergeJoinExec: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) * 50 <= CAST(b@0 AS Int64), metrics=[output_bytes=320.0 KB, spilled_bytes=0.0 B, peak_mem_used=432.0 B] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +03)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +05)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + +query TT +EXPLAIN ANALYZE +SELECT ea_smj_t1.a, ea_smj_t1.b +FROM ea_smj_t1 +LEFT SEMI JOIN ea_smj_t2 ON ea_smj_t1.a = ea_smj_t2.a; +---- +Plan with Metrics +01)SortMergeJoinExec: join_type=LeftSemi, on=[(a@0, a@0)], metrics=[output_bytes=160.0 KB, spilled_bytes=0.0 B, peak_mem_used=0.0 B] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +03)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false], metrics=[output_bytes=0.0 B, spilled_bytes=0.0 B] +05)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + +statement ok +DROP TABLE ea_smj_t1; + +statement ok +DROP TABLE ea_smj_t2; + +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +set datafusion.optimizer.repartition_joins = false; + +statement ok +set datafusion.execution.enable_migration_aggregate = false; + +query TT +EXPLAIN ANALYZE +WITH t (k) AS ( + VALUES (1), (2), (1), (3) +) +SELECT k, count(*) +FROM t +GROUP BY k; +---- +Plan with Metrics +01)ProjectionExec: expr=[k@0 as k, count(Int64(1))@1 as count(*)], metrics=[output_bytes=1056.0 B] +02)--AggregateExec: mode=Single, gby=[k@0 as k], aggr=[count(Int64(1))], metrics=[output_bytes=1056.0 B, spilled_bytes=0.0 B, peak_mem_used=9.2 KB] +03)----ProjectionExec: expr=[column1@0 as k], metrics=[output_bytes=32.0 B] +04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] + +statement ok +reset datafusion.execution.enable_migration_aggregate; + +statement ok +reset datafusion.optimizer.prefer_hash_join; + +statement ok +reset datafusion.optimizer.hash_join_inlist_pushdown_max_size; + +statement ok +reset datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values; + +statement ok +reset datafusion.optimizer.repartition_joins; + +statement ok +reset datafusion.optimizer.enable_piecewise_merge_join; + +statement ok +set datafusion.execution.target_partitions = 4; + +statement ok +reset datafusion.explain.analyze_categories; + +statement ok +reset datafusion.explain.analyze_level; + # ------------------------------------------------------------------ # Same category/level filtering, but via the Postgres-style # `EXPLAIN (ANALYZE, METRICS ..., LEVEL ...)` statement option list. diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 082b10167274c..1101ad6d2b14d 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -5185,7 +5185,7 @@ LEFT ANTI JOIN ( ) t2 ON t1.k = t2.k; ---- Plan with Metrics -01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, array_map_created_count=0, build_input_batches=0, build_input_rows=0, input_batches=1, input_rows=2, build_mem_used=, build_time=, join_time=, avg_fanout=N/A (0/0), probe_hit_rate=0% (0/2)] +01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=, output_bytes=, output_batches=1, build_mem_used=, array_map_created_count=0, build_input_batches=0, build_input_rows=0, input_batches=1, input_rows=2, build_time=, join_time=, avg_fanout=N/A (0/0), probe_hit_rate=0% (0/2)] 02)--ProjectionExec: expr=[column1@0 as k], metrics=[output_rows=0, elapsed_compute=, output_bytes=, output_batches=0, expr_0_eval_time=] 03)----FilterExec: column1@0 != 1, metrics=[output_rows=0, elapsed_compute=, output_bytes=, output_batches=0, selectivity=0% (0/1)] 04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] From d883e36f7767adcc7fa6c864d6c41ceb3ef2b7b6 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Thu, 25 Jun 2026 09:08:16 +0800 Subject: [PATCH 2/2] review: fix FFI_MetricValue order --- datafusion/ffi/src/physical_expr/metrics.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/ffi/src/physical_expr/metrics.rs b/datafusion/ffi/src/physical_expr/metrics.rs index f6323a0ff98aa..763cc3f079a01 100644 --- a/datafusion/ffi/src/physical_expr/metrics.rs +++ b/datafusion/ffi/src/physical_expr/metrics.rs @@ -128,6 +128,9 @@ pub struct FFI_RatioMetrics { } /// FFI-stable mirror of [`MetricValue`]. +/// +/// This is part of the stable ABI and must not be reordered. New variants must be +/// appended at the end. #[repr(C, u8)] #[derive(Debug, Clone)] pub enum FFI_MetricValue { @@ -147,10 +150,6 @@ pub enum FFI_MetricValue { name: SString, gauge: u64, }, - PeakMemoryUsage { - name: SString, - gauge: u64, - }, Time { name: SString, time_ns: u64, @@ -174,6 +173,10 @@ pub enum FFI_MetricValue { display: SString, as_usize_value: u64, }, + PeakMemoryUsage { + name: SString, + gauge: u64, + }, } // -----------------------------------------------------------------------------