diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 323bc28057d4..0c26d7bdb1ee 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -217,8 +217,11 @@ mod tests { assert_eq!(tt_batches, 50 /* 100/2 */); // test metadata - assert_eq!(exec.statistics()?.num_rows, Precision::Absent); - assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent); + assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent); + assert_eq!( + exec.partition_statistics(None)?.total_byte_size, + Precision::Absent + ); Ok(()) } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index a70a0f51d330..48c604efac5c 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -75,8 +75,11 @@ mod tests { assert_eq!(tt_batches, 6 /* 12/2 */); // test metadata - assert_eq!(exec.statistics()?.num_rows, Precision::Absent); - assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent); + assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent); + assert_eq!( + exec.partition_statistics(None)?.total_byte_size, + Precision::Absent + ); Ok(()) } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7b8b99273f4e..0801ae6d8eb3 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -616,9 +616,15 @@ mod tests { assert_eq!(tt_batches, 4 /* 8/2 */); // test metadata - assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); + assert_eq!( + exec.partition_statistics(None)?.num_rows, + Precision::Exact(8) + ); // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 - assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + assert_eq!( + exec.partition_statistics(None)?.total_byte_size, + Precision::Exact(671) + ); Ok(()) } @@ -659,9 +665,15 @@ mod tests { get_exec(&state, "alltypes_plain.parquet", projection, Some(1)).await?; // note: even if the limit is set, the executor rounds up to the batch size - assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); + assert_eq!( + exec.partition_statistics(None)?.num_rows, + Precision::Exact(8) + ); // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 - assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + assert_eq!( + exec.partition_statistics(None)?.total_byte_size, + Precision::Exact(671) + ); let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); assert_eq!(11, batches[0].num_columns()); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 94668dee1613..84a63faffbbd 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -739,7 +739,7 @@ impl ListingOptions { /// # Ok(()) /// # } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ListingTable { table_paths: Vec, /// `file_schema` contains only the columns physically stored in the data files themselves. @@ -1149,10 +1149,17 @@ impl ListingTable { let (file_group, inexact_stats) = get_files_with_limit(files, limit, self.options.collect_stat).await?; - let mut file_groups = file_group.split_files(self.options.target_partitions); + let file_groups = file_group.split_files(self.options.target_partitions); + let (mut file_groups, mut stats) = compute_all_files_statistics( + file_groups, + self.schema(), + self.options.collect_stat, + inexact_stats, + )?; let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema()) .map_schema(self.file_schema.as_ref())?; - // Use schema_mapper to map each file-level column statistics to table-level column statistics + stats.column_statistics = + schema_mapper.map_column_statistics(&stats.column_statistics)?; file_groups.iter_mut().try_for_each(|file_group| { if let Some(stat) = file_group.statistics_mut() { stat.column_statistics = @@ -1160,12 +1167,7 @@ impl ListingTable { } Ok::<_, DataFusionError>(()) })?; - compute_all_files_statistics( - file_groups, - self.schema(), - self.options.collect_stat, - inexact_stats, - ) + Ok((file_groups, stats)) } /// Collects statistics for a given partitioned file. @@ -1324,8 +1326,14 @@ mod tests { assert_eq!(exec.output_partitioning().partition_count(), 1); // test metadata - assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); - assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + assert_eq!( + exec.partition_statistics(None)?.num_rows, + Precision::Exact(8) + ); + assert_eq!( + exec.partition_statistics(None)?.total_byte_size, + Precision::Exact(671) + ); Ok(()) } @@ -1350,9 +1358,15 @@ mod tests { let table = ListingTable::try_new(config)?; let exec = table.scan(&state, None, &[], None).await?; - assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); + assert_eq!( + exec.partition_statistics(None)?.num_rows, + Precision::Exact(8) + ); // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 - assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + assert_eq!( + exec.partition_statistics(None)?.total_byte_size, + Precision::Exact(671) + ); Ok(()) } @@ -1378,8 +1392,11 @@ mod tests { let table = ListingTable::try_new(config)?; let exec = table.scan(&state, None, &[], None).await?; - assert_eq!(exec.statistics()?.num_rows, Precision::Absent); - assert_eq!(exec.statistics()?.total_byte_size, Precision::Absent); + assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent); + assert_eq!( + exec.partition_statistics(None)?.total_byte_size, + Precision::Absent + ); Ok(()) } diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index f0a1f94d87e1..1ec31d0b2e3f 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -190,6 +190,11 @@ impl ExecutionPlan for ArrowExec { fn statistics(&self) -> Result { self.inner.statistics() } + + fn partition_statistics(&self, partition: Option) -> Result { + self.inner.partition_statistics(partition) + } + fn fetch(&self) -> Option { self.inner.fetch() } diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index eb930b9a60bc..cbdc4a448ea4 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -180,6 +180,13 @@ impl ExecutionPlan for CustomExecutionPlan { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema())); + } let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap(); Ok(Statistics { num_rows: Precision::Exact(batch.num_rows()), diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 66c886510e96..f9b0db0e808c 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -184,6 +184,14 @@ impl ExecutionPlan for StatisticsValidation { fn statistics(&self) -> Result { Ok(self.stats.clone()) } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + Ok(Statistics::new_unknown(&self.schema)) + } else { + Ok(self.stats.clone()) + } + } } fn init_ctx(stats: Statistics, schema: Schema) -> Result { @@ -232,7 +240,7 @@ async fn sql_basic() -> Result<()> { let physical_plan = df.create_physical_plan().await.unwrap(); // the statistics should be those of the source - assert_eq!(stats, physical_plan.statistics()?); + assert_eq!(stats, physical_plan.partition_statistics(None)?); Ok(()) } @@ -248,7 +256,7 @@ async fn sql_filter() -> Result<()> { .unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); - let stats = physical_plan.statistics()?; + let stats = physical_plan.partition_statistics(None)?; assert_eq!(stats.num_rows, Precision::Inexact(1)); Ok(()) @@ -270,7 +278,7 @@ async fn sql_limit() -> Result<()> { column_statistics: col_stats, total_byte_size: Precision::Absent }, - physical_plan.statistics()? + physical_plan.partition_statistics(None)? ); let df = ctx @@ -279,7 +287,7 @@ async fn sql_limit() -> Result<()> { .unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is larger than the original number of lines, statistics remain unchanged - assert_eq!(stats, physical_plan.statistics()?); + assert_eq!(stats, physical_plan.partition_statistics(None)?); Ok(()) } @@ -296,7 +304,7 @@ async fn sql_window() -> Result<()> { let physical_plan = df.create_physical_plan().await.unwrap(); - let result = physical_plan.statistics()?; + let result = physical_plan.partition_statistics(None)?; assert_eq!(stats.num_rows, result.num_rows); let col_stats = result.column_statistics; diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 000000000000..ec164c6df7b5 Binary files /dev/null and b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet differ diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 000000000000..4b78cf963c11 Binary files /dev/null and b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet differ diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 000000000000..09a01771d503 Binary files /dev/null and b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet differ diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 000000000000..6398cc43a2f5 Binary files /dev/null and b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet differ diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 7e98ebed6c9a..8b87d59d8c46 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -50,13 +50,19 @@ async fn check_stats_precision_with_filter_pushdown() { let (_, _, state) = get_cache_runtime_state(); // Scan without filter, stats are exact let exec = table.scan(&state, None, &[], None).await.unwrap(); - assert_eq!(exec.statistics().unwrap().num_rows, Precision::Exact(8)); + assert_eq!( + exec.partition_statistics(None).unwrap().num_rows, + Precision::Exact(8) + ); // Scan with filter pushdown, stats are inexact let filter = Expr::gt(col("id"), lit(1)); let exec = table.scan(&state, None, &[filter], None).await.unwrap(); - assert_eq!(exec.statistics().unwrap().num_rows, Precision::Inexact(8)); + assert_eq!( + exec.partition_statistics(None).unwrap().num_rows, + Precision::Inexact(8) + ); } #[tokio::test] @@ -79,9 +85,12 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(get_static_cache_size(&state1), 0); let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); - assert_eq!(exec1.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( - exec1.statistics().unwrap().total_byte_size, + exec1.partition_statistics(None).unwrap().num_rows, + Precision::Exact(8) + ); + assert_eq!( + exec1.partition_statistics(None).unwrap().total_byte_size, // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 Precision::Exact(671), ); @@ -91,9 +100,12 @@ async fn load_table_stats_with_session_level_cache() { //check session 1 cache result not show in session 2 assert_eq!(get_static_cache_size(&state2), 0); let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); - assert_eq!(exec2.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( - exec2.statistics().unwrap().total_byte_size, + exec2.partition_statistics(None).unwrap().num_rows, + Precision::Exact(8) + ); + assert_eq!( + exec2.partition_statistics(None).unwrap().total_byte_size, // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 Precision::Exact(671), ); @@ -103,9 +115,12 @@ async fn load_table_stats_with_session_level_cache() { //check session 1 cache result not show in session 2 assert_eq!(get_static_cache_size(&state1), 1); let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); - assert_eq!(exec3.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( - exec3.statistics().unwrap().total_byte_size, + exec3.partition_statistics(None).unwrap().num_rows, + Precision::Exact(8) + ); + assert_eq!( + exec3.partition_statistics(None).unwrap().total_byte_size, // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 Precision::Exact(671), ); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 5e182cb93b39..bc6957ff7b42 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -170,7 +170,7 @@ impl ExecutionPlan for SortRequiredExec { } fn statistics(&self) -> Result { - self.input.statistics() + self.input.partition_statistics(None) } } diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index d3b6ec700bee..d8c0c142f7fb 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -251,11 +251,19 @@ async fn test_join_with_swap() { .expect("The type of the plan should not be changed"); assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, + swapped_join + .left() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, + swapped_join + .right() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(2097152) ); } @@ -291,11 +299,19 @@ async fn test_left_join_no_swap() { .expect("The type of the plan should not be changed"); assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, + swapped_join + .left() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, + swapped_join + .right() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(2097152) ); } @@ -336,11 +352,19 @@ async fn test_join_with_swap_semi() { assert_eq!(swapped_join.schema().fields().len(), 1); assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, + swapped_join + .left() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, + swapped_join + .right() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(2097152) ); assert_eq!(original_schema, swapped_join.schema()); @@ -455,11 +479,19 @@ async fn test_join_no_swap() { .expect("The type of the plan should not be changed"); assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, + swapped_join + .left() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, + swapped_join + .right() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(2097152) ); } @@ -524,11 +556,19 @@ async fn test_nl_join_with_swap(join_type: JoinType) { ); assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, + swapped_join + .left() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, + swapped_join + .right() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(2097152) ); } @@ -589,11 +629,19 @@ async fn test_nl_join_with_swap_no_proj(join_type: JoinType) { ); assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, + swapped_join + .left() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(8192) ); assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, + swapped_join + .right() + .partition_statistics(None) + .unwrap() + .total_byte_size, Precision::Inexact(2097152) ); } @@ -1067,6 +1115,14 @@ impl ExecutionPlan for StatisticsExec { fn statistics(&self) -> Result { Ok(self.stats.clone()) } + + fn partition_statistics(&self, partition: Option) -> Result { + Ok(if partition.is_some() { + Statistics::new_unknown(&self.schema) + } else { + self.stats.clone() + }) + } } #[test] diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index 6643e7fd59b7..c115a5253adc 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -24,6 +24,7 @@ mod enforce_sorting; mod join_selection; mod limit_pushdown; mod limited_distinct_aggregation; +mod partition_statistics; mod projection_pushdown; mod push_down_filter; mod replace_with_order_preserving_variants; diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs new file mode 100644 index 000000000000..8ac583c7150b --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -0,0 +1,491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod test { + use arrow::array::{Int32Array, RecordBatch}; + use arrow_schema::{DataType, Field, Schema, SortOptions}; + use datafusion::datasource::listing::ListingTable; + use datafusion::prelude::SessionContext; + use datafusion_catalog::TableProvider; + use datafusion_common::stats::Precision; + use datafusion_common::Result; + use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::TaskContext; + use datafusion_expr_common::operator::Operator; + use datafusion_physical_expr::expressions::{binary, lit, Column}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; + use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion_physical_plan::filter::FilterExec; + use datafusion_physical_plan::joins::CrossJoinExec; + use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; + use datafusion_physical_plan::projection::ProjectionExec; + use datafusion_physical_plan::sorts::sort::SortExec; + use datafusion_physical_plan::union::UnionExec; + use datafusion_physical_plan::{ + execute_stream_partitioned, ExecutionPlan, ExecutionPlanProperties, + }; + use futures::TryStreamExt; + use std::sync::Arc; + + /// Creates a test table with statistics from the test data directory. + /// + /// This function: + /// - Creates an external table from './tests/data/test_statistics_per_partition' + /// - If we set the `target_partition` to 2, the data contains 2 partitions, each with 2 rows + /// - Each partition has an "id" column (INT) with the following values: + /// - First partition: [3, 4] + /// - Second partition: [1, 2] + /// - Each row is 110 bytes in size + /// + /// @param target_partition Optional parameter to set the target partitions + /// @return ExecutionPlan representing the scan of the table with statistics + async fn create_scan_exec_with_statistics( + create_table_sql: Option<&str>, + target_partition: Option, + ) -> Arc { + let mut session_config = SessionConfig::new().with_collect_statistics(true); + if let Some(partition) = target_partition { + session_config = session_config.with_target_partitions(partition); + } + let ctx = SessionContext::new_with_config(session_config); + // Create table with partition + let create_table_sql = create_table_sql.unwrap_or( + "CREATE EXTERNAL TABLE t1 (id INT NOT NULL, date DATE) \ + STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\ + PARTITIONED BY (date) \ + WITH ORDER (id ASC);", + ); + // Get table name from `create_table_sql` + let table_name = create_table_sql + .split_whitespace() + .nth(3) + .unwrap_or("t1") + .to_string(); + ctx.sql(create_table_sql) + .await + .unwrap() + .collect() + .await + .unwrap(); + let table = ctx.table_provider(table_name.as_str()).await.unwrap(); + let listing_table = table + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + listing_table + .scan(&ctx.state(), None, &[], None) + .await + .unwrap() + } + + /// Helper function to create expected statistics for a partition with Int32 column + fn create_partition_statistics( + num_rows: usize, + total_byte_size: usize, + min_value: i32, + max_value: i32, + include_date_column: bool, + ) -> Statistics { + let mut column_stats = vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(max_value))), + min_value: Precision::Exact(ScalarValue::Int32(Some(min_value))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }]; + + if include_date_column { + column_stats.push(ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }); + } + + Statistics { + num_rows: Precision::Exact(num_rows), + total_byte_size: Precision::Exact(total_byte_size), + column_statistics: column_stats, + } + } + + /// Helper function to validate that statistics from statistics_by_partition match the actual data + async fn validate_statistics_with_data( + plan: Arc, + expected_stats: Vec<(i32, i32, usize)>, // (min_id, max_id, row_count) + id_column_index: usize, + ) -> Result<()> { + let ctx = TaskContext::default(); + let partitions = execute_stream_partitioned(plan, Arc::new(ctx))?; + + let mut actual_stats = Vec::new(); + for partition_stream in partitions.into_iter() { + let result: Vec = partition_stream.try_collect().await?; + + let mut min_id = i32::MAX; + let mut max_id = i32::MIN; + let mut row_count = 0; + + for batch in result { + if batch.num_columns() > id_column_index { + let id_array = batch + .column(id_column_index) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + let id_value = id_array.value(i); + min_id = min_id.min(id_value); + max_id = max_id.max(id_value); + row_count += 1; + } + } + } + + if row_count > 0 { + actual_stats.push((min_id, max_id, row_count)); + } + } + + // Compare actual data with expected statistics + assert_eq!( + actual_stats.len(), + expected_stats.len(), + "Number of partitions with data doesn't match expected" + ); + for i in 0..actual_stats.len() { + assert_eq!( + actual_stats[i], expected_stats[i], + "Partition {} data doesn't match statistics", + i + ); + } + + Ok(()) + } + + #[tokio::test] + async fn test_statistics_by_partition_of_data_source() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let statistics = (0..scan.output_partitioning().partition_count()) + .map(|idx| scan.partition_statistics(Some(idx))) + .collect::>>()?; + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); + // Check the statistics of each partition + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![ + (3, 4, 2), // (min_id, max_id, row_count) for first partition + (1, 2, 2), // (min_id, max_id, row_count) for second partition + ]; + validate_statistics_with_data(scan, expected_stats, 0).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_statistics_by_partition_of_projection() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + // Add projection execution plan + let exprs: Vec<(Arc, String)> = + vec![(Arc::new(Column::new("id", 0)), "id".to_string())]; + let projection: Arc = + Arc::new(ProjectionExec::try_new(exprs, scan)?); + let statistics = (0..projection.output_partitioning().partition_count()) + .map(|idx| projection.partition_statistics(Some(idx))) + .collect::>>()?; + let expected_statistic_partition_1 = + create_partition_statistics(2, 8, 3, 4, false); + let expected_statistic_partition_2 = + create_partition_statistics(2, 8, 1, 2, false); + // Check the statistics of each partition + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(projection, expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistics_by_partition_of_sort() -> Result<()> { + let scan_1 = create_scan_exec_with_statistics(None, Some(1)).await; + // Add sort execution plan + let sort = SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("id", 0)), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]), + scan_1, + ); + let sort_exec: Arc = Arc::new(sort.clone()); + let statistics = (0..sort_exec.output_partitioning().partition_count()) + .map(|idx| sort_exec.partition_statistics(Some(idx))) + .collect::>>()?; + let expected_statistic_partition = + create_partition_statistics(4, 220, 1, 4, true); + assert_eq!(statistics.len(), 1); + assert_eq!(statistics[0], expected_statistic_partition); + // Check the statistics_by_partition with real results + let expected_stats = vec![(1, 4, 4)]; + validate_statistics_with_data(sort_exec.clone(), expected_stats, 0).await?; + + // Sort with preserve_partitioning + let scan_2 = create_scan_exec_with_statistics(None, Some(2)).await; + // Add sort execution plan + let sort_exec: Arc = Arc::new( + SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("id", 0)), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]), + scan_2, + ) + .with_preserve_partitioning(true), + ); + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); + let statistics = (0..sort_exec.output_partitioning().partition_count()) + .map(|idx| sort_exec.partition_statistics(Some(idx))) + .collect::>>()?; + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(sort_exec, expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistics_by_partition_of_filter() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); + let predicate = binary( + Arc::new(Column::new("id", 0)), + Operator::Lt, + lit(1i32), + &schema, + )?; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, scan)?); + let full_statistics = filter.partition_statistics(None)?; + let expected_full_statistic = Statistics { + num_rows: Precision::Inexact(0), + total_byte_size: Precision::Inexact(0), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Null), + sum_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + }, + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Null), + sum_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + }, + ], + }; + assert_eq!(full_statistics, expected_full_statistic); + + let statistics = (0..filter.output_partitioning().partition_count()) + .map(|idx| filter.partition_statistics(Some(idx))) + .collect::>>()?; + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_full_statistic); + assert_eq!(statistics[1], expected_full_statistic); + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_union() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let union_exec: Arc = + Arc::new(UnionExec::new(vec![scan.clone(), scan])); + let statistics = (0..union_exec.output_partitioning().partition_count()) + .map(|idx| union_exec.partition_statistics(Some(idx))) + .collect::>>()?; + // Check that we have 4 partitions (2 from each scan) + assert_eq!(statistics.len(), 4); + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); + // Verify first partition (from first scan) + assert_eq!(statistics[0], expected_statistic_partition_1); + // Verify second partition (from first scan) + assert_eq!(statistics[1], expected_statistic_partition_2); + // Verify third partition (from second scan - same as first partition) + assert_eq!(statistics[2], expected_statistic_partition_1); + // Verify fourth partition (from second scan - same as second partition) + assert_eq!(statistics[3], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2), (3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(union_exec, expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_cross_join() -> Result<()> { + let left_scan = create_scan_exec_with_statistics(None, Some(1)).await; + let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL) \ + STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\ + WITH ORDER (id ASC);"; + let right_scan = + create_scan_exec_with_statistics(Some(right_create_table_sql), Some(2)).await; + let cross_join: Arc = + Arc::new(CrossJoinExec::new(left_scan, right_scan)); + let statistics = (0..cross_join.output_partitioning().partition_count()) + .map(|idx| cross_join.partition_statistics(Some(idx))) + .collect::>>()?; + // Check that we have 2 partitions + assert_eq!(statistics.len(), 2); + let mut expected_statistic_partition_1 = + create_partition_statistics(8, 48400, 1, 4, true); + expected_statistic_partition_1 + .column_statistics + .push(ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(3))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }); + let mut expected_statistic_partition_2 = + create_partition_statistics(8, 48400, 1, 4, true); + expected_statistic_partition_2 + .column_statistics + .push(ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(2))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(1, 4, 8), (1, 4, 8)]; + validate_statistics_with_data(cross_join, expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + dbg!(scan.partition_statistics(Some(0))?); + let coalesce_batches: Arc = + Arc::new(CoalesceBatchesExec::new(scan, 2)); + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); + let statistics = (0..coalesce_batches.output_partitioning().partition_count()) + .map(|idx| coalesce_batches.partition_statistics(Some(idx))) + .collect::>>()?; + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(coalesce_batches, expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_coalesce_partitions() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let coalesce_partitions: Arc = + Arc::new(CoalescePartitionsExec::new(scan)); + let expected_statistic_partition = + create_partition_statistics(4, 220, 1, 4, true); + let statistics = (0..coalesce_partitions.output_partitioning().partition_count()) + .map(|idx| coalesce_partitions.partition_statistics(Some(idx))) + .collect::>>()?; + assert_eq!(statistics.len(), 1); + assert_eq!(statistics[0], expected_statistic_partition); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(1, 4, 4)]; + validate_statistics_with_data(coalesce_partitions, expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_local_limit() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let local_limit: Arc = + Arc::new(LocalLimitExec::new(scan.clone(), 1)); + let statistics = (0..local_limit.output_partitioning().partition_count()) + .map(|idx| local_limit.partition_statistics(Some(idx))) + .collect::>>()?; + assert_eq!(statistics.len(), 2); + let schema = scan.schema(); + let mut expected_statistic_partition = Statistics::new_unknown(&schema); + expected_statistic_partition.num_rows = Precision::Exact(1); + assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(statistics[1], expected_statistic_partition); + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_global_limit_partitions() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + // Skip 2 rows + let global_limit: Arc = + Arc::new(GlobalLimitExec::new(scan.clone(), 0, Some(2))); + let statistics = (0..global_limit.output_partitioning().partition_count()) + .map(|idx| global_limit.partition_statistics(Some(idx))) + .collect::>>()?; + assert_eq!(statistics.len(), 1); + let expected_statistic_partition = + create_partition_statistics(2, 110, 3, 4, true); + assert_eq!(statistics[0], expected_statistic_partition); + Ok(()) + } +} diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index fa6c7432413f..160084213c7c 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -511,7 +511,7 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 4); - let stat_cols = physical_plan.statistics()?.column_statistics; + let stat_cols = physical_plan.partition_statistics(None)?.column_statistics; assert_eq!(stat_cols.len(), 4); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(3)); @@ -526,7 +526,7 @@ async fn parquet_statistics() -> Result<()> { let schema = physical_plan.schema(); assert_eq!(schema.fields().len(), 2); - let stat_cols = physical_plan.statistics()?.column_statistics; + let stat_cols = physical_plan.partition_statistics(None)?.column_statistics; assert_eq!(stat_cols.len(), 2); // stats for the first col are read from the parquet file assert_eq!(stat_cols[0].null_count, Precision::Exact(1)); diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index ce3722e7b11e..2ec9fc6a98b0 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -141,6 +141,10 @@ impl ExecutionPlan for AvroExec { self.inner.statistics() } + fn partition_statistics(&self, partition: Option) -> Result { + self.inner.partition_statistics(partition) + } + fn metrics(&self) -> Option { self.inner.metrics() } diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index f5d45cd3fc88..2c7a851159ed 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -381,6 +381,10 @@ impl ExecutionPlan for CsvExec { self.inner.statistics() } + fn partition_statistics(&self, partition: Option) -> Result { + self.inner.partition_statistics(partition) + } + fn metrics(&self) -> Option { self.inner.metrics() } diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 929787e436c8..bc9f5f1b78f7 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -420,9 +420,14 @@ impl FileGroup { self.files.push(file); } - /// Get the statistics for this group - pub fn statistics(&self) -> Option<&Statistics> { - self.statistics.as_deref() + /// Get the specific file statistics for the given index + /// If the index is None, return the `FileGroup` statistics + pub fn file_statistics(&self, index: Option) -> Option<&Statistics> { + if let Some(index) = index { + self.files.get(index).and_then(|f| f.statistics.as_deref()) + } else { + self.statistics.as_deref() + } } /// Get the mutable reference to the statistics for this group diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 6d0e16ef4b91..1a62a2a4a41c 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -976,7 +976,7 @@ mod tests { )?; assert_eq!( - values.statistics()?, + values.partition_statistics(None)?, Statistics { num_rows: Precision::Exact(rows), total_byte_size: Precision::Exact(8), // not important diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 6abe3c329869..022f77f2e421 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -201,6 +201,24 @@ impl ExecutionPlan for DataSourceExec { self.data_source.statistics() } + fn partition_statistics(&self, partition: Option) -> Result { + if let Some(partition) = partition { + let mut statistics = Statistics::new_unknown(&self.schema()); + if let Some(file_config) = + self.data_source.as_any().downcast_ref::() + { + if let Some(file_group) = file_config.file_groups.get(partition) { + if let Some(stat) = file_group.file_statistics(None) { + statistics = stat.clone(); + } + } + } + Ok(statistics) + } else { + Ok(self.data_source.statistics()?) + } + } + fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; let cache = self.cache.clone(); diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 8a04d77b273d..48aa9fe32ee8 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -476,7 +476,7 @@ pub fn compute_all_files_statistics( // Then summary statistics across all file groups let file_groups_statistics = file_groups_with_stats .iter() - .filter_map(|file_group| file_group.statistics()); + .filter_map(|file_group| file_group.file_statistics(None)); let mut statistics = Statistics::try_merge_iter(file_groups_statistics, &table_schema)?; diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 28ee10eb650a..6c44c8fe86c5 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -53,7 +53,7 @@ impl PhysicalOptimizerRule for AggregateStatistics { .as_any() .downcast_ref::() .expect("take_optimizable() ensures that this is a AggregateExec"); - let stats = partial_agg_exec.input().statistics()?; + let stats = partial_agg_exec.input().partition_statistics(None)?; let mut projections = vec![]; for expr in partial_agg_exec.aggr_expr() { let field = expr.field(); diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 947fd3eba23e..0c80cce4cc02 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1114,7 +1114,8 @@ fn get_repartition_requirement_status( { // Decide whether adding a round robin is beneficial depending on // the statistical information we have on the number of rows: - let roundrobin_beneficial_stats = match child.statistics()?.num_rows { + let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows + { Precision::Exact(n_rows) => n_rows > batch_size, Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size), Precision::Absent => true, diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index 5a772ccdd249..05758e5dfdf1 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -65,8 +65,8 @@ pub(crate) fn should_swap_join_order( // Get the left and right table's total bytes // If both the left and right tables contain total_byte_size statistics, // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows` - let left_stats = left.statistics()?; - let right_stats = right.statistics()?; + let left_stats = left.partition_statistics(None)?; + let right_stats = right.partition_statistics(None)?; // First compare `total_byte_size` of left and right side, // if information in this field is insufficient fallback to the `num_rows` match ( @@ -91,7 +91,7 @@ fn supports_collect_by_thresholds( ) -> bool { // Currently we do not trust the 0 value from stats, due to stats collection might have bug // TODO check the logic in datasource::get_statistics_with_limit() - let Ok(stats) = plan.statistics() else { + let Ok(stats) = plan.partition_statistics(None) else { return false; }; diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 3ca0547aa11d..0488b3fd49a8 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -200,7 +200,11 @@ impl ExecutionPlan for OutputRequirementExec { } fn statistics(&self) -> Result { - self.input.statistics() + self.input.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + self.input.partition_statistics(partition) } fn try_swapping_with_projection( diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 8906468f68db..a6fc2fc682a2 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -735,6 +735,59 @@ impl AggregateExec { pub fn input_order_mode(&self) -> &InputOrderMode { &self.input_order_mode } + + fn statistics_inner(&self) -> Result { + // TODO stats: group expressions: + // - once expressions will be able to compute their own stats, use it here + // - case where we group by on a column for which with have the `distinct` stat + // TODO stats: aggr expression: + // - aggregations sometimes also preserve invariants such as min, max... + let column_statistics = Statistics::unknown_column(&self.schema()); + match self.mode { + AggregateMode::Final | AggregateMode::FinalPartitioned + if self.group_by.expr.is_empty() => + { + Ok(Statistics { + num_rows: Precision::Exact(1), + column_statistics, + total_byte_size: Precision::Absent, + }) + } + _ => { + // When the input row count is 0 or 1, we can adopt that statistic keeping its reliability. + // When it is larger than 1, we degrade the precision since it may decrease after aggregation. + let num_rows = if let Some(value) = self + .input() + .partition_statistics(None)? + .num_rows + .get_value() + { + if *value > 1 { + self.input() + .partition_statistics(None)? + .num_rows + .to_inexact() + } else if *value == 0 { + // Aggregation on an empty table creates a null row. + self.input() + .partition_statistics(None)? + .num_rows + .add(&Precision::Exact(1)) + } else { + // num_rows = 1 case + self.input().partition_statistics(None)?.num_rows + } + } else { + Precision::Absent + }; + Ok(Statistics { + num_rows, + column_statistics, + total_byte_size: Precision::Absent, + }) + } + } + } } impl DisplayAs for AggregateExec { @@ -941,49 +994,15 @@ impl ExecutionPlan for AggregateExec { } fn statistics(&self) -> Result { - // TODO stats: group expressions: - // - once expressions will be able to compute their own stats, use it here - // - case where we group by on a column for which with have the `distinct` stat - // TODO stats: aggr expression: - // - aggregations sometimes also preserve invariants such as min, max... - let column_statistics = Statistics::unknown_column(&self.schema()); - match self.mode { - AggregateMode::Final | AggregateMode::FinalPartitioned - if self.group_by.expr.is_empty() => - { - Ok(Statistics { - num_rows: Precision::Exact(1), - column_statistics, - total_byte_size: Precision::Absent, - }) - } - _ => { - // When the input row count is 0 or 1, we can adopt that statistic keeping its reliability. - // When it is larger than 1, we degrade the precision since it may decrease after aggregation. - let num_rows = if let Some(value) = - self.input().statistics()?.num_rows.get_value() - { - if *value > 1 { - self.input().statistics()?.num_rows.to_inexact() - } else if *value == 0 { - // Aggregation on an empty table creates a null row. - self.input() - .statistics()? - .num_rows - .add(&Precision::Exact(1)) - } else { - // num_rows = 1 case - self.input().statistics()?.num_rows - } - } else { - Precision::Absent - }; - Ok(Statistics { - num_rows, - column_statistics, - total_byte_size: Precision::Absent, - }) - } + self.statistics_inner() + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_none() { + // If the partition is not specified, we can use the statistics of the input plan + self.statistics_inner() + } else { + Ok(Statistics::new_unknown(&self.schema())) } } @@ -1924,6 +1943,13 @@ mod tests { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(self.schema().as_ref())); + } let (_, batches) = some_data(); Ok(common::compute_record_batch_statistics( &[batches], diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index faab5fdc5eb6..34b3f1b0241b 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -196,7 +196,16 @@ impl ExecutionPlan for CoalesceBatchesExec { } fn statistics(&self) -> Result { - Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + self.input.partition_statistics(partition)?.with_fetch( + self.schema(), + self.fetch, + 0, + 1, + ) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 715dd159e7e8..114f830688c9 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -196,7 +196,13 @@ impl ExecutionPlan for CoalescePartitionsExec { } fn statistics(&self) -> Result { - Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) + self.partition_statistics(None) + } + + fn partition_statistics(&self, _partition: Option) -> Result { + self.input + .partition_statistics(None)? + .with_fetch(self.schema(), self.fetch, 0, 1) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index e247f5ad9d19..5a44f8735c8f 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -394,7 +394,7 @@ impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { } } if self.show_statistics { - let stats = plan.statistics().map_err(|_e| fmt::Error)?; + let stats = plan.partition_statistics(None).map_err(|_e| fmt::Error)?; write!(self.f, ", statistics=[{}]", stats)?; } if self.show_schema { @@ -479,7 +479,7 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { }; let statistics = if self.show_statistics { - let stats = plan.statistics().map_err(|_e| fmt::Error)?; + let stats = plan.partition_statistics(None).map_err(|_e| fmt::Error)?; format!("statistics=[{}]", stats) } else { "".to_string() @@ -1120,6 +1120,13 @@ mod tests { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(self.schema().as_ref())); + } match self { Self::Panic => panic!("expected panic"), Self::Error => { diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 3fdde39df6f1..6c8133de4624 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -150,6 +150,13 @@ impl ExecutionPlan for EmptyExec { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema())); + } let batch = self .data() .expect("Create empty RecordBatch should not fail"); diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2b6eac7be067..9551c2b1743e 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -426,10 +426,30 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// For TableScan executors, which supports filter pushdown, special attention /// needs to be paid to whether the stats returned by this method are exact or not + #[deprecated(since = "48.0.0", note = "Use `partition_statistics` method instead")] fn statistics(&self) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + /// Returns statistics for a specific partition of this `ExecutionPlan` node. + /// If statistics are not available, should return [`Statistics::new_unknown`] + /// (the default), not an error. + /// If `partition` is `None`, it returns statistics for the entire plan. + fn partition_statistics(&self, partition: Option) -> Result { + if let Some(idx) = partition { + // Validate partition index + let partition_count = self.properties().partitioning.partition_count(); + if idx >= partition_count { + return internal_err!( + "Invalid partition index: {}, the partition count is {}", + idx, + partition_count + ); + } + } + Ok(Statistics::new_unknown(&self.schema())) + } + /// Returns `true` if a limit can be safely pushed down through this /// `ExecutionPlan` node. /// @@ -1177,6 +1197,10 @@ mod tests { fn statistics(&self) -> Result { unimplemented!() } + + fn partition_statistics(&self, _partition: Option) -> Result { + unimplemented!() + } } #[derive(Debug)] @@ -1240,6 +1264,10 @@ mod tests { fn statistics(&self) -> Result { unimplemented!() } + + fn partition_statistics(&self, _partition: Option) -> Result { + unimplemented!() + } } #[test] diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 95fa67025e90..6df3e236a0dd 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -174,12 +174,11 @@ impl FilterExec { /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. fn statistics_helper( - input: &Arc, + schema: SchemaRef, + input_stats: Statistics, predicate: &Arc, default_selectivity: u8, ) -> Result { - let input_stats = input.statistics()?; - let schema = input.schema(); if !check_support(predicate, &schema) { let selectivity = default_selectivity as f64 / 100.0; let mut stats = input_stats.to_inexact(); @@ -193,7 +192,7 @@ impl FilterExec { let num_rows = input_stats.num_rows; let total_byte_size = input_stats.total_byte_size; let input_analysis_ctx = AnalysisContext::try_from_statistics( - &input.schema(), + &schema, &input_stats.column_statistics, )?; @@ -260,7 +259,12 @@ impl FilterExec { ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: - let stats = Self::statistics_helper(input, predicate, default_selectivity)?; + let stats = Self::statistics_helper( + input.schema(), + input.partition_statistics(None)?, + predicate, + default_selectivity, + )?; let mut eq_properties = input.equivalence_properties().clone(); let (equal_pairs, _) = collect_columns_from_predicate(predicate); for (lhs, rhs) in equal_pairs { @@ -400,8 +404,14 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + let input_stats = self.input.partition_statistics(partition)?; let stats = Self::statistics_helper( - &self.input, + self.schema(), + input_stats, self.predicate(), self.default_selectivity, )?; @@ -757,7 +767,7 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; assert_eq!(statistics.num_rows, Precision::Inexact(25)); assert_eq!( statistics.total_byte_size, @@ -807,7 +817,7 @@ mod tests { sub_filter, )?); - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; assert_eq!(statistics.num_rows, Precision::Inexact(16)); assert_eq!( statistics.column_statistics, @@ -867,7 +877,7 @@ mod tests { binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?, b_gt_5, )?); - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; // On a uniform distribution, only fifteen rows will satisfy the // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50). @@ -912,7 +922,7 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; assert_eq!(statistics.num_rows, Precision::Absent); Ok(()) @@ -985,7 +995,7 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; // 0.5 (from a) * 0.333333... (from b) * 0.798387... (from c) ≈ 0.1330... // num_rows after ceil => 133.0... => 134 // total_byte_size after ceil => 532.0... => 533 @@ -1081,10 +1091,10 @@ mod tests { )), )); // Since filter predicate passes all entries, statistics after filter shouldn't change. - let expected = input.statistics()?.column_statistics; + let expected = input.partition_statistics(None)?.column_statistics; let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; assert_eq!(statistics.num_rows, Precision::Inexact(1000)); assert_eq!(statistics.total_byte_size, Precision::Inexact(4000)); @@ -1137,7 +1147,7 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; assert_eq!(statistics.num_rows, Precision::Inexact(0)); assert_eq!(statistics.total_byte_size, Precision::Inexact(0)); @@ -1197,7 +1207,7 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; assert_eq!(statistics.num_rows, Precision::Inexact(490)); assert_eq!(statistics.total_byte_size, Precision::Inexact(1960)); @@ -1247,7 +1257,7 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let filter_statistics = filter.statistics()?; + let filter_statistics = filter.partition_statistics(None)?; let expected_filter_statistics = Statistics { num_rows: Precision::Absent, @@ -1281,7 +1291,7 @@ mod tests { )); let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); - let filter_statistics = filter.statistics()?; + let filter_statistics = filter.partition_statistics(None)?; // First column is "a", and it is a column with only one value after the filter. assert!(filter_statistics.column_statistics[0].is_singleton()); @@ -1328,11 +1338,11 @@ mod tests { Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))), )); let filter = FilterExec::try_new(predicate, input)?; - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; assert_eq!(statistics.num_rows, Precision::Inexact(200)); assert_eq!(statistics.total_byte_size, Precision::Inexact(800)); let filter = filter.with_default_selectivity(40)?; - let statistics = filter.statistics()?; + let statistics = filter.partition_statistics(None)?; assert_eq!(statistics.num_rows, Precision::Inexact(400)); assert_eq!(statistics.total_byte_size, Precision::Inexact(1600)); Ok(()) @@ -1366,7 +1376,7 @@ mod tests { Arc::new(EmptyExec::new(Arc::clone(&schema))), )?; - exec.statistics().unwrap(); + exec.partition_statistics(None).unwrap(); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 8dd1addff15c..cbc38f114972 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -337,10 +337,15 @@ impl ExecutionPlan for CrossJoinExec { } fn statistics(&self) -> Result { - Ok(stats_cartesian_product( - self.left.statistics()?, - self.right.statistics()?, - )) + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + // Get the all partitions statistics of the left + let left_stats = self.left.partition_statistics(None)?; + let right_stats = self.right.partition_statistics(partition)?; + + Ok(stats_cartesian_product(left_stats, right_stats)) } /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e8904db0f3ea..dd77c5e37c3e 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -879,12 +879,19 @@ impl ExecutionPlan for HashJoinExec { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema())); + } // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` let stats = estimate_join_statistics( - Arc::clone(&self.left), - Arc::clone(&self.right), + self.left.partition_statistics(None)?, + self.right.partition_statistics(None)?, self.on.clone(), &self.join_type, &self.join_schema, diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b90279595096..f913bcaa947b 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -567,9 +567,16 @@ impl ExecutionPlan for NestedLoopJoinExec { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema())); + } estimate_join_statistics( - Arc::clone(&self.left), - Arc::clone(&self.right), + self.left.partition_statistics(None)?, + self.right.partition_statistics(None)?, vec![], &self.join_type, &self.join_schema, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 89f2e3c911f8..6817bd9b76dd 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -514,12 +514,19 @@ impl ExecutionPlan for SortMergeJoinExec { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema())); + } // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` estimate_join_statistics( - Arc::clone(&self.left), - Arc::clone(&self.right), + self.left.partition_statistics(None)?, + self.right.partition_statistics(None)?, self.on.clone(), &self.join_type, &self.schema, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 5516f172d510..ce1860d20565 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -403,15 +403,12 @@ struct PartialJoinStatistics { /// Estimate the statistics for the given join's output. pub(crate) fn estimate_join_statistics( - left: Arc, - right: Arc, + left_stats: Statistics, + right_stats: Statistics, on: JoinOn, join_type: &JoinType, schema: &Schema, ) -> Result { - let left_stats = left.statistics()?; - let right_stats = right.statistics()?; - let join_stats = estimate_join_cardinality(join_type, left_stats, right_stats, &on); let (num_rows, column_statistics) = match join_stats { Some(stats) => (Precision::Inexact(stats.num_rows), stats.column_statistics), diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 89cf47a6d650..80c393eec485 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -193,8 +193,11 @@ impl ExecutionPlan for GlobalLimitExec { } fn statistics(&self) -> Result { - Statistics::with_fetch( - self.input.statistics()?, + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + self.input.partition_statistics(partition)?.with_fetch( self.schema(), self.fetch, self.skip, @@ -334,8 +337,11 @@ impl ExecutionPlan for LocalLimitExec { } fn statistics(&self) -> Result { - Statistics::with_fetch( - self.input.statistics()?, + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + self.input.partition_statistics(partition)?.with_fetch( self.schema(), Some(self.fetch), 0, @@ -765,7 +771,7 @@ mod tests { let offset = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch); - Ok(offset.statistics()?.num_rows) + Ok(offset.partition_statistics(None)?.num_rows) } pub fn build_group_by( @@ -805,7 +811,7 @@ mod tests { fetch, ); - Ok(offset.statistics()?.num_rows) + Ok(offset.partition_statistics(None)?.num_rows) } async fn row_number_statistics_for_local_limit( @@ -818,7 +824,7 @@ mod tests { let offset = LocalLimitExec::new(csv, fetch); - Ok(offset.statistics()?.num_rows) + Ok(offset.partition_statistics(None)?.num_rows) } /// Return a RecordBatch with a single array with row_count sz diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index eecd980d09f8..46847b2413c0 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -166,6 +166,13 @@ impl ExecutionPlan for PlaceholderRowExec { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema())); + } let batch = self .data() .expect("Create single row placeholder RecordBatch should not fail"); diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 72934c74446e..8761d64b60b5 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -244,8 +244,13 @@ impl ExecutionPlan for ProjectionExec { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + let input_stats = self.input.partition_statistics(partition)?; Ok(stats_projection( - self.input.statistics()?, + input_stats, self.expr.iter().map(|(e, _)| Arc::clone(e)), Arc::clone(&self.schema), )) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index c480fc2abaa1..f7c4f7477f12 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -688,7 +688,15 @@ impl ExecutionPlan for RepartitionExec { } fn statistics(&self) -> Result { - self.input.statistics() + self.input.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_none() { + self.input.partition_statistics(None) + } else { + Ok(Statistics::new_unknown(&self.schema())) + } } fn cardinality_effect(&self) -> CardinalityEffect { diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 320fa21c8665..6d7f64ebfbd5 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -321,7 +321,11 @@ impl ExecutionPlan for PartialSortExec { } fn statistics(&self) -> Result { - self.input.statistics() + self.input.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + self.input.partition_statistics(partition) } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 9d0f34cc7f0f..bf7d69b62f21 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1297,7 +1297,24 @@ impl ExecutionPlan for SortExec { } fn statistics(&self) -> Result { - Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if !self.preserve_partitioning() { + return self.input.partition_statistics(None)?.with_fetch( + self.schema(), + self.fetch, + 0, + 1, + ); + } + self.input.partition_statistics(partition)?.with_fetch( + self.schema(), + self.fetch, + 0, + 1, + ) } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b987dff36441..da346d8f8a4e 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -343,7 +343,11 @@ impl ExecutionPlan for SortPreservingMergeExec { } fn statistics(&self) -> Result { - self.input.statistics() + self.input.partition_statistics(None) + } + + fn partition_statistics(&self, _partition: Option) -> Result { + self.input.partition_statistics(None) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index a2dc1d778436..cb0b060d272a 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -170,7 +170,15 @@ impl ExecutionPlan for TestMemoryExec { } fn statistics(&self) -> Result { - self.statistics() + self.statistics_inner() + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + Ok(Statistics::new_unknown(&self.schema)) + } else { + self.statistics_inner() + } } fn fetch(&self) -> Option { @@ -214,7 +222,7 @@ impl TestMemoryExec { ) } - fn statistics(&self) -> Result { + fn statistics_inner(&self) -> Result { Ok(common::compute_record_batch_statistics( &self.partitions, &self.schema, diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index d0a0d25779cc..12ffca871f07 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -255,6 +255,13 @@ impl ExecutionPlan for MockExec { // Panics if one of the batches is an error fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema)); + } let data: Result> = self .data .iter() @@ -405,6 +412,13 @@ impl ExecutionPlan for BarrierExec { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema)); + } Ok(common::compute_record_batch_statistics( &self.data, &self.schema, @@ -590,6 +604,14 @@ impl ExecutionPlan for StatisticsExec { fn statistics(&self) -> Result { Ok(self.stats.clone()) } + + fn partition_statistics(&self, partition: Option) -> Result { + Ok(if partition.is_some() { + Statistics::new_unknown(&self.schema) + } else { + self.stats.clone() + }) + } } /// Execution plan that emits streams that block forever. diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 2b666093f29e..bdae97f5d13d 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -258,16 +258,36 @@ impl ExecutionPlan for UnionExec { } fn statistics(&self) -> Result { - let stats = self - .inputs - .iter() - .map(|stat| stat.statistics()) - .collect::>>()?; + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if let Some(partition_idx) = partition { + // For a specific partition, find which input it belongs to + let mut remaining_idx = partition_idx; + for input in &self.inputs { + let input_partition_count = input.output_partitioning().partition_count(); + if remaining_idx < input_partition_count { + // This partition belongs to this input + return input.partition_statistics(Some(remaining_idx)); + } + remaining_idx -= input_partition_count; + } + // If we get here, the partition index is out of bounds + Ok(Statistics::new_unknown(&self.schema())) + } else { + // Collect statistics from all inputs + let stats = self + .inputs + .iter() + .map(|input_exec| input_exec.partition_statistics(None)) + .collect::>>()?; - Ok(stats - .into_iter() - .reduce(stats_union) - .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) + Ok(stats + .into_iter() + .reduce(stats_union) + .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) + } } fn benefits_from_input_partitioning(&self) -> Vec { @@ -471,10 +491,17 @@ impl ExecutionPlan for InterleaveExec { } fn statistics(&self) -> Result { + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_some() { + return Ok(Statistics::new_unknown(&self.schema())); + } let stats = self .inputs .iter() - .map(|stat| stat.statistics()) + .map(|stat| stat.partition_statistics(None)) .collect::>>()?; Ok(stats diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 6cb64bcb5d86..fb27ccf30179 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -308,8 +308,10 @@ mod tests { data, )?; + #[allow(deprecated)] + let stats = values.statistics()?; assert_eq!( - values.statistics()?, + stats, Statistics { num_rows: Precision::Exact(rows), total_byte_size: Precision::Exact(8), // not important diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 92138bf6a7a1..1ea02adafe3f 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -226,6 +226,23 @@ impl BoundedWindowAggExec { .unwrap_or_else(Vec::new) } } + + fn statistics_helper(&self, statistics: Statistics) -> Result { + let win_cols = self.window_expr.len(); + let input_cols = self.input.schema().fields().len(); + // TODO stats: some windowing function will maintain invariants such as min, max... + let mut column_statistics = Vec::with_capacity(win_cols + input_cols); + // copy stats of the input to the beginning of the schema. + column_statistics.extend(statistics.column_statistics); + for _ in 0..win_cols { + column_statistics.push(ColumnStatistics::new_unknown()) + } + Ok(Statistics { + num_rows: statistics.num_rows, + column_statistics, + total_byte_size: Precision::Absent, + }) + } } impl DisplayAs for BoundedWindowAggExec { @@ -343,21 +360,12 @@ impl ExecutionPlan for BoundedWindowAggExec { } fn statistics(&self) -> Result { - let input_stat = self.input.statistics()?; - let win_cols = self.window_expr.len(); - let input_cols = self.input.schema().fields().len(); - // TODO stats: some windowing function will maintain invariants such as min, max... - let mut column_statistics = Vec::with_capacity(win_cols + input_cols); - // copy stats of the input to the beginning of the schema. - column_statistics.extend(input_stat.column_statistics); - for _ in 0..win_cols { - column_statistics.push(ColumnStatistics::new_unknown()) - } - Ok(Statistics { - num_rows: input_stat.num_rows, - column_statistics, - total_byte_size: Precision::Absent, - }) + self.partition_statistics(None) + } + + fn partition_statistics(&self, partition: Option) -> Result { + let input_stat = self.input.partition_statistics(partition)?; + self.statistics_helper(input_stat) } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 3c42d3032ed5..4c76e2230875 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -156,6 +156,24 @@ impl WindowAggExec { .unwrap_or_else(Vec::new) } } + + fn statistics_inner(&self) -> Result { + let input_stat = self.input.partition_statistics(None)?; + let win_cols = self.window_expr.len(); + let input_cols = self.input.schema().fields().len(); + // TODO stats: some windowing function will maintain invariants such as min, max... + let mut column_statistics = Vec::with_capacity(win_cols + input_cols); + // copy stats of the input to the beginning of the schema. + column_statistics.extend(input_stat.column_statistics); + for _ in 0..win_cols { + column_statistics.push(ColumnStatistics::new_unknown()) + } + Ok(Statistics { + num_rows: input_stat.num_rows, + column_statistics, + total_byte_size: Precision::Absent, + }) + } } impl DisplayAs for WindowAggExec { @@ -271,21 +289,15 @@ impl ExecutionPlan for WindowAggExec { } fn statistics(&self) -> Result { - let input_stat = self.input.statistics()?; - let win_cols = self.window_expr.len(); - let input_cols = self.input.schema().fields().len(); - // TODO stats: some windowing function will maintain invariants such as min, max... - let mut column_statistics = Vec::with_capacity(win_cols + input_cols); - // copy stats of the input to the beginning of the schema. - column_statistics.extend(input_stat.column_statistics); - for _ in 0..win_cols { - column_statistics.push(ColumnStatistics::new_unknown()) + self.statistics_inner() + } + + fn partition_statistics(&self, partition: Option) -> Result { + if partition.is_none() { + self.statistics_inner() + } else { + Ok(Statistics::new_unknown(&self.schema())) } - Ok(Statistics { - num_rows: input_stat.num_rows, - column_statistics, - total_byte_size: Precision::Absent, - }) } } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 126a7d0bba29..eea1b9958633 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -227,6 +227,10 @@ impl ExecutionPlan for WorkTableExec { fn statistics(&self) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + fn partition_statistics(&self, _partition: Option) -> Result { + Ok(Statistics::new_unknown(&self.schema())) + } } #[cfg(test)] diff --git a/datafusion/sqllogictest/test_files/listing_table_statistics.slt b/datafusion/sqllogictest/test_files/listing_table_statistics.slt index aeeaaea6c2a3..8efe5f0d9ae5 100644 --- a/datafusion/sqllogictest/test_files/listing_table_statistics.slt +++ b/datafusion/sqllogictest/test_files/listing_table_statistics.slt @@ -44,4 +44,4 @@ statement ok set datafusion.execution.collect_statistics = false; statement ok -set datafusion.explain.show_statistics = false; \ No newline at end of file +set datafusion.explain.show_statistics = false;