Skip to content

Move PruningStatistics into datafusion::common #16069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow::datatypes::{Int32Type, SchemaRef};
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::common::pruning::PruningStatistics;
use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
Expand All @@ -39,7 +40,7 @@ use datafusion::parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use std::any::Any;
Expand Down
3 changes: 2 additions & 1 deletion datafusion-examples/examples/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::pruning::PruningStatistics;
use datafusion::common::{DFSchema, ScalarValue};
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::prelude::*;

/// This example shows how to use DataFusion's `PruningPredicate` to prove
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod format;
pub mod hash_utils;
pub mod instant;
pub mod parsers;
pub mod pruning;
pub mod rounding;
pub mod scalar;
pub mod spans;
Expand Down
124 changes: 124 additions & 0 deletions datafusion/common/src/pruning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, BooleanArray};
use std::collections::HashSet;

use crate::Column;
use crate::ScalarValue;

/// A source of runtime statistical information to [`PruningPredicate`]s.
///
/// # Supported Information
///
/// 1. Minimum and maximum values for columns
///
/// 2. Null counts and row counts for columns
///
/// 3. Whether the values in a column are contained in a set of literals
///
/// # Vectorized Interface
///
/// Information for containers / files are returned as Arrow [`ArrayRef`], so
/// the evaluation happens once on a single `RecordBatch`, which amortizes the
/// overhead of evaluating the predicate. This is important when pruning 1000s
/// of containers which often happens in analytic systems that have 1000s of
/// potential files to consider.
///
/// For example, for the following three files with a single column `a`:
/// ```text
/// file1: column a: min=5, max=10
/// file2: column a: No stats
/// file2: column a: min=20, max=30
/// ```
///
/// PruningStatistics would return:
///
/// ```text
/// min_values("a") -> Some([5, Null, 20])
/// max_values("a") -> Some([10, Null, 30])
/// min_values("X") -> None
/// ```
///
/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html
pub trait PruningStatistics {
/// Return the minimum values for the named column, if known.
///
/// If the minimum value for a particular container is not known, the
/// returned array should have `null` in that row. If the minimum value is
/// not known for any row, return `None`.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn min_values(&self, column: &Column) -> Option<ArrayRef>;

/// Return the maximum values for the named column, if known.
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn max_values(&self, column: &Column) -> Option<ArrayRef>;

/// Return the number of containers (e.g. Row Groups) being pruned with
/// these statistics.
///
/// This value corresponds to the size of the [`ArrayRef`] returned by
/// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
/// and [`Self::row_counts`].
fn num_containers(&self) -> usize;

/// Return the number of null values for the named column as an
/// [`UInt64Array`]
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;

/// Return the number of rows for the named column in each container
/// as an [`UInt64Array`].
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;

/// Returns [`BooleanArray`] where each row represents information known
/// about specific literal `values` in a column.
///
/// For example, Parquet Bloom Filters implement this API to communicate
/// that `values` are known not to be present in a Row Group.
///
/// The returned array has one row for each container, with the following
/// meanings:
/// * `true` if the values in `column` ONLY contain values from `values`
/// * `false` if the values in `column` are NOT ANY of `values`
/// * `null` if the neither of the above holds or is unknown.
///
/// If these statistics can not determine column membership for any
/// container, return `None` (the default).
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray>;
}
3 changes: 2 additions & 1 deletion datafusion/datasource-parquet/src/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
};
use datafusion_common::pruning::PruningStatistics;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion_physical_optimizer::pruning::PruningPredicate;

use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use std::sync::Arc;
use super::{ParquetAccessPlan, ParquetFileMetrics};
use arrow::array::{ArrayRef, BooleanArray};
use arrow::datatypes::Schema;
use datafusion_common::pruning::PruningStatistics;
use datafusion_common::{Column, Result, ScalarValue};
use datafusion_datasource::FileRange;
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion_physical_optimizer::pruning::PruningPredicate;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::parquet_column;
use parquet::basic::Type;
Expand Down
101 changes: 1 addition & 100 deletions datafusion/physical-optimizer/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::{RecordBatch, RecordBatchOptions},
};
use datafusion_common::pruning::PruningStatistics;
use log::{debug, trace};

use datafusion_common::error::{DataFusionError, Result};
Expand All @@ -44,106 +45,6 @@ use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};

/// A source of runtime statistical information to [`PruningPredicate`]s.
///
/// # Supported Information
///
/// 1. Minimum and maximum values for columns
///
/// 2. Null counts and row counts for columns
///
/// 3. Whether the values in a column are contained in a set of literals
///
/// # Vectorized Interface
///
/// Information for containers / files are returned as Arrow [`ArrayRef`], so
/// the evaluation happens once on a single `RecordBatch`, which amortizes the
/// overhead of evaluating the predicate. This is important when pruning 1000s
/// of containers which often happens in analytic systems that have 1000s of
/// potential files to consider.
///
/// For example, for the following three files with a single column `a`:
/// ```text
/// file1: column a: min=5, max=10
/// file2: column a: No stats
/// file2: column a: min=20, max=30
/// ```
///
/// PruningStatistics would return:
///
/// ```text
/// min_values("a") -> Some([5, Null, 20])
/// max_values("a") -> Some([10, Null, 30])
/// min_values("X") -> None
/// ```
pub trait PruningStatistics {
/// Return the minimum values for the named column, if known.
///
/// If the minimum value for a particular container is not known, the
/// returned array should have `null` in that row. If the minimum value is
/// not known for any row, return `None`.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn min_values(&self, column: &Column) -> Option<ArrayRef>;

/// Return the maximum values for the named column, if known.
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn max_values(&self, column: &Column) -> Option<ArrayRef>;

/// Return the number of containers (e.g. Row Groups) being pruned with
/// these statistics.
///
/// This value corresponds to the size of the [`ArrayRef`] returned by
/// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
/// and [`Self::row_counts`].
fn num_containers(&self) -> usize;

/// Return the number of null values for the named column as an
/// [`UInt64Array`]
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;

/// Return the number of rows for the named column in each container
/// as an [`UInt64Array`].
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;

/// Returns [`BooleanArray`] where each row represents information known
/// about specific literal `values` in a column.
///
/// For example, Parquet Bloom Filters implement this API to communicate
/// that `values` are known not to be present in a Row Group.
///
/// The returned array has one row for each container, with the following
/// meanings:
/// * `true` if the values in `column` ONLY contain values from `values`
/// * `false` if the values in `column` are NOT ANY of `values`
/// * `null` if the neither of the above holds or is unknown.
///
/// If these statistics can not determine column membership for any
/// container, return `None` (the default).
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray>;
}

/// Used to prove that arbitrary predicates (boolean expression) can not
/// possibly evaluate to `true` given information about a column provided by
/// [`PruningStatistics`].
Expand Down