Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ rat.txt

# data generated by examples
datafusion-examples/examples/datafusion-examples/

# Profiling artifacts (flamegraphs, comparison tables)
profiling-artifacts/
3 changes: 3 additions & 0 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow::array::*;
use arrow::datatypes::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
#[cfg(not(feature = "force_hash_collisions"))]
use itertools::Itertools;

#[cfg(not(feature = "force_hash_collisions"))]
Expand Down Expand Up @@ -278,6 +279,7 @@ fn hash_array<T>(
/// HAS_NULLS: do we have to check null in the inner loop
/// HAS_BUFFERS: if true, array has external buffers; if false, all strings are inlined/ less then 12 bytes
/// REHASH: if true, combining with existing hash, otherwise initializing
#[cfg(not(feature = "force_hash_collisions"))]
#[inline(never)]
fn hash_string_view_array_inner<
T: ByteViewType,
Expand Down Expand Up @@ -398,6 +400,7 @@ fn hash_generic_byte_view_array<T: ByteViewType>(
/// - `HAS_NULL_KEYS`: Whether to check for null dictionary keys
/// - `HAS_NULL_VALUES`: Whether to check for null dictionary values
/// - `MULTI_COL`: Whether to combine with existing hash (true) or initialize (false)
#[cfg(not(feature = "force_hash_collisions"))]
#[inline(never)]
fn hash_dictionary_inner<
K: ArrowDictionaryKeyType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock};

Expand Down Expand Up @@ -115,9 +116,16 @@ fn scan_with_predicate(
let file_metrics = ParquetFileMetrics::new(0, &path.display().to_string(), &metrics);

let builder = if pushdown {
if let Some(row_filter) =
build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)?
{
let all_cols: HashSet<usize> = (0..file_schema.fields().len()).collect();
let (row_filter, _demoted) = build_row_filter(
predicate,
file_schema,
&metadata,
false,
&file_metrics,
&all_cols,
)?;
if let Some(row_filter) = row_filter {
builder.with_row_filter(row_filter)
} else {
builder
Expand Down
238 changes: 218 additions & 20 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::DataType;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::utils::{conjunction_opt, reassign_expr_columns};
use datafusion_physical_expr_adapter::replace_columns_with_literals;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand All @@ -46,6 +46,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::{
PhysicalExpr, is_dynamic_physical_expr,
};
use datafusion_physical_plan::filter::batch_filter;
use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
};
Expand Down Expand Up @@ -459,27 +460,37 @@ impl FileOpener for ParquetOpener {
// `row_filter` for details.
// ---------------------------------------------------------------------

// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
// Filter pushdown: evaluate predicates during scan.
//
// Each conjunct is evaluated individually inside
// `build_row_filter`: conjuncts whose required columns leave
// extra projected columns unread benefit from late
// materialization and stay in the RowFilter; conjuncts that
// reference all projected columns are demoted to batch-level
// filtering to avoid the overhead of the RowFilter machinery.
let batch_filter_predicate = if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten()
{
let projection_col_indices: HashSet<usize> =
projection.column_indices().into_iter().collect();

let (row_filter, demoted) = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
);
&projection_col_indices,
)?;

match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{predicate:?}': {e}"
);
}
};
if let Some(filter) = row_filter {
builder = builder.with_row_filter(filter);
}

// Combine demoted conjuncts into a single batch filter
conjunction_opt(demoted)
} else {
None
};
if force_filter_selections {
builder =
Expand Down Expand Up @@ -627,6 +638,12 @@ impl FileOpener for ParquetOpener {
let projection = projection
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;

// Also remap the batch filter predicate to the stream schema
let batch_filter_predicate = batch_filter_predicate
.map(|pred| reassign_expr_columns(pred, &stream_schema))
.transpose()?;
let has_batch_filter = batch_filter_predicate.is_some();

let projector = projection.make_projector(&stream_schema)?;

let stream = stream.map_err(DataFusionError::from).map(move |b| {
Expand All @@ -636,6 +653,10 @@ impl FileOpener for ParquetOpener {
&predicate_cache_inner_records,
&predicate_cache_records,
);
// Apply batch-level filter when RowFilter pushdown was skipped
if let Some(ref filter_pred) = batch_filter_predicate {
b = batch_filter(&b, filter_pred)?;
}
b = projector.project_batch(&b)?;
if replace_schema {
// Ensure the output batch has the expected schema.
Expand Down Expand Up @@ -664,6 +685,18 @@ impl FileOpener for ParquetOpener {
// ----------------------------------------------------------------------
// Step: wrap the stream so a dynamic filter can stop the file scan early
// ----------------------------------------------------------------------

// When batch-level filtering is active (RowFilter pushdown was
// skipped), filter out empty batches that result from the predicate
// removing all rows in a decoded batch.
let stream = if has_batch_filter {
stream
.try_filter(|batch| std::future::ready(batch.num_rows() > 0))
.boxed()
} else {
stream.boxed()
};

if let Some(file_pruner) = file_pruner {
Ok(EarlyStoppingStream::new(
stream,
Expand All @@ -672,7 +705,7 @@ impl FileOpener for ParquetOpener {
)
.boxed())
} else {
Ok(stream.boxed())
Ok(stream)
}
}))
}
Expand Down Expand Up @@ -1025,10 +1058,10 @@ mod test {
stats::Precision,
};
use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener};
use datafusion_expr::{col, lit};
use datafusion_expr::{Operator, col, lit};
use datafusion_physical_expr::{
PhysicalExpr,
expressions::{Column, DynamicFilterPhysicalExpr, Literal},
expressions::{BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal},
planner::logical2physical,
projection::ProjectionExprs,
};
Expand Down Expand Up @@ -2004,4 +2037,169 @@ mod test {
"Reverse scan with non-contiguous row groups should correctly map RowSelection"
);
}

/// Per-conjunct RowFilter demotion: when a conjunct's required columns
/// cover all projected columns, it provides no column-decode savings
/// and is demoted to batch-level filtering. Conjuncts with extra
/// projected columns stay in the RowFilter for late materialization.
#[tokio::test]
async fn test_skip_row_filter_when_filter_cols_subset_of_projection() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

// 4 rows: a=[1,2,2,4], b=[10,20,30,40]
let batch = record_batch!(
("a", Int32, vec![Some(1), Some(2), Some(2), Some(4)]),
("b", Int32, vec![Some(10), Some(20), Some(30), Some(40)])
)
.unwrap();
let data_size =
write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;
let schema = batch.schema();
let file = PartitionedFile::new(
"test.parquet".to_string(),
u64::try_from(data_size).unwrap(),
);

// Case 1: filter_cols == projection_cols → batch filter path
// Filter: a = 2, Projection: [a]
// Conjunct cols = {0}, projection = {0} → no extra cols to skip
// decoding → demoted to batch filter.
let expr = col("a").eq(lit(2));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_rows, 2, "batch filter should return 2 matching rows");
assert_eq!(num_batches, 1);

// Case 1b: filter_cols ⊂ projection_cols → RowFilter path
// Filter: a = 2, Projection: [a, b]
// Conjunct cols = {0}, projection = {0, 1} → extra col b → RowFilter
// skips decoding column b for non-matching rows.
let expr = col("a").eq(lit(2));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0, 1])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_rows, 2, "RowFilter should return 2 matching rows");
assert_eq!(num_batches, 1);

// Case 2: filter_cols ⊄ projection_cols → RowFilter path
// Filter: b = 20, Projection: [a] (only column a)
// Conjunct cols = {1}, projection = {0} → extra col a → RowFilter
let expr = col("b").eq(lit(20));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let values = collect_int32_values(stream).await;
assert_eq!(
values,
vec![2],
"RowFilter should return correct filtered values"
);

// Case 3: no matches → 0 rows via batch filter
// Filter: a = 99, Projection: [a]
// Conjunct cols = {0}, projection = {0} → no extra cols → batch filter
let expr = col("a").eq(lit(99));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_rows, 0, "no rows should match");
assert_eq!(num_batches, 0, "empty batches should be filtered out");

// Case 4: verify correct values in batch filter path
// Filter: a = 2, Projection: [a]
let expr = col("a").eq(lit(2));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let values = collect_int32_values(stream).await;
assert_eq!(
values,
vec![2, 2],
"batch filter should return correct values"
);

// Case 5: multi-conjunct predicate → RowFilter path
// Filter: a = 2 AND b = 20, Projection: [a, b]
// Per-conjunct: `a = 2` has extra col b, `b = 20` has extra col a
// → both kept in RowFilter for incremental evaluation.
let expr = col("a").eq(lit(2)).and(col("b").eq(lit(20)));
let predicate = logical2physical(&expr, &schema);
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0, 1])
.with_predicate(predicate)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_rows, 1, "multi-conjunct RowFilter should return 1 row");
assert_eq!(num_batches, 1);

// Case 6: single static conjunct + dynamic filter → batch filter path
// Simulates TopK: `WHERE a = 2 ORDER BY a LIMIT N`
// Predicate: `a = 2 AND <dynamic>(a < 3)`, Projection: [a]
// Per-conjunct: both conjuncts reference only col a = projection
// → no extra cols → all demoted to batch filter.
let static_expr = logical2physical(&col("a").eq(lit(2)), &schema);
let dynamic_expr =
make_dynamic_expr(logical2physical(&col("a").lt(lit(3)), &schema));
let combined: Arc<dyn PhysicalExpr> =
Arc::new(BinaryExpr::new(static_expr, Operator::And, dynamic_expr));
let opener = ParquetOpenerBuilder::new()
.with_store(Arc::clone(&store))
.with_schema(Arc::clone(&schema))
.with_projection_indices(&[0])
.with_predicate(combined)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.build();
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(
num_rows, 2,
"single static conjunct + dynamic filter should use batch filter"
);
assert_eq!(num_batches, 1);
}
}
Loading
Loading