diff --git a/datafusion/physical-expr/benches/in_list.rs b/datafusion/physical-expr/benches/in_list.rs index 954715d0e5a9c..405088f88fbb2 100644 --- a/datafusion/physical-expr/benches/in_list.rs +++ b/datafusion/physical-expr/benches/in_list.rs @@ -23,6 +23,7 @@ use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_common::ScalarValue; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::{col, in_list, lit}; use rand::distr::Alphanumeric; use rand::prelude::*; @@ -50,7 +51,9 @@ fn random_string(rng: &mut StdRng, len: usize) -> String { } const IN_LIST_LENGTHS: [usize; 4] = [3, 8, 28, 100]; +const DYNAMIC_LIST_LENGTHS: [usize; 3] = [3, 8, 28]; const NULL_PERCENTS: [f64; 2] = [0., 0.2]; +const MATCH_PERCENTS: [f64; 3] = [0.0, 0.5, 1.0]; const STRING_LENGTHS: [usize; 3] = [3, 12, 100]; const ARRAY_LENGTH: usize = 8192; @@ -219,6 +222,144 @@ fn bench_realistic_mixed_strings( } } +/// Benchmarks the dynamic evaluation path (no static filter) by including +/// a column reference in the IN list, which prevents static filter creation. +fn do_bench_dynamic( + c: &mut Criterion, + name: &str, + values: ArrayRef, + list_cols: &[ArrayRef], +) { + let mut fields = vec![Field::new("a", values.data_type().clone(), true)]; + let mut columns: Vec = vec![values]; + + // Build list expressions: mix of column refs (forces dynamic path) + let schema_fields: Vec = list_cols + .iter() + .enumerate() + .map(|(i, col_arr)| { + let name = format!("b{i}"); + fields.push(Field::new(&name, col_arr.data_type().clone(), true)); + columns.push(Arc::clone(col_arr)); + Field::new(&name, col_arr.data_type().clone(), true) + }) + .collect(); + + let schema = Schema::new(fields); + let list_exprs: Vec> = schema_fields + .iter() + .map(|f| col(f.name(), &schema).unwrap()) + .collect(); + + let expr = in_list(col("a", &schema).unwrap(), list_exprs, &false, &schema).unwrap(); + let batch = RecordBatch::try_new(Arc::new(schema), columns).unwrap(); + + c.bench_function(name, |b| { + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); +} + +/// Benchmarks the dynamic IN list path for Int32 arrays with column references. +fn bench_dynamic_int32(c: &mut Criterion) { + let mut rng = StdRng::seed_from_u64(42); + + for list_size in DYNAMIC_LIST_LENGTHS { + for match_percent in MATCH_PERCENTS { + for null_percent in NULL_PERCENTS { + // Generate the "needle" column + let values: Int32Array = (0..ARRAY_LENGTH) + .map(|_| { + rng.random_bool(1.0 - null_percent) + .then(|| rng.random_range(0..1000)) + }) + .collect(); + + // Generate list columns with controlled match rate + let list_cols: Vec = (0..list_size) + .map(|_| { + let col: Int32Array = (0..ARRAY_LENGTH) + .map(|row| { + if rng.random_bool(1.0 - null_percent) { + if rng.random_bool(match_percent) { + // Copy from values to create a match + if values.is_null(row) { + Some(rng.random_range(0..1000)) + } else { + Some(values.value(row)) + } + } else { + // Random value (unlikely to match) + Some(rng.random_range(1000..2000)) + } + } else { + None + } + }) + .collect(); + Arc::new(col) as ArrayRef + }) + .collect(); + + do_bench_dynamic( + c, + &format!( + "in_list_dynamic/Int32/list={}/match={}%/nulls={}%", + list_size, + (match_percent * 100.0) as u32, + (null_percent * 100.0) as u32 + ), + Arc::new(values), + &list_cols, + ); + } + } + } +} + +/// Benchmarks the dynamic IN list path for Utf8 arrays with column references. +fn bench_dynamic_utf8(c: &mut Criterion) { + let mut rng = StdRng::seed_from_u64(99); + + for list_size in DYNAMIC_LIST_LENGTHS { + for match_percent in MATCH_PERCENTS { + // Generate the "needle" column + let value_strings: Vec> = (0..ARRAY_LENGTH) + .map(|_| rng.random_bool(0.8).then(|| random_string(&mut rng, 12))) + .collect(); + let values: StringArray = + value_strings.iter().map(|s| s.as_deref()).collect(); + + // Generate list columns with controlled match rate + let list_cols: Vec = (0..list_size) + .map(|_| { + let col: StringArray = (0..ARRAY_LENGTH) + .map(|row| { + if rng.random_bool(match_percent) { + // Copy from values to create a match + value_strings[row].as_deref() + } else { + Some("no_match_value_xyz") + } + }) + .collect(); + Arc::new(col) as ArrayRef + }) + .collect(); + + do_bench_dynamic( + c, + &format!( + "in_list_dynamic/Utf8/list={}/match={}%", + list_size, + (match_percent * 100.0) as u32, + ), + Arc::new(values), + &list_cols, + ); + } + } +} + /// Entry point: registers in_list benchmarks for string and numeric array types. fn criterion_benchmark(c: &mut Criterion) { let mut rng = StdRng::seed_from_u64(120320); @@ -266,6 +407,10 @@ fn criterion_benchmark(c: &mut Criterion) { |rng| rng.random(), |v| ScalarValue::TimestampNanosecond(Some(v), None), ); + + // Dynamic path benchmarks (non-constant list expressions) + bench_dynamic_int32(c); + bench_dynamic_utf8(c); } criterion_group! { diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 5c2f1adcd0cf3..58ccf69a18786 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -28,6 +28,7 @@ use crate::physical_expr::physical_exprs_bag_equal; use arrow::array::*; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::compute::kernels::boolean::{not, or_kleene}; +use arrow::compute::kernels::cmp::eq as arrow_eq; use arrow::compute::{SortOptions, take}; use arrow::datatypes::*; use arrow::util::bit_iterator::BitIndexIterator; @@ -773,11 +774,23 @@ impl PhysicalExpr for InListExpr { None => { // No static filter: iterate through each expression, compare, and OR results let value = value.into_array(num_rows)?; - let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold( - BooleanArray::new(BooleanBuffer::new_unset(num_rows), None), - |result, expr| -> Result { - let rhs = match expr? { - ColumnarValue::Array(array) => { + let use_arrow_eq = !value.data_type().is_nested(); + let mut found = + BooleanArray::new(BooleanBuffer::new_unset(num_rows), None); + + for expr in &self.list { + // Short-circuit: if every row is already true, skip remaining list items + if found.true_count() == num_rows { + break; + } + + let rhs = match expr.evaluate(batch)? { + ColumnarValue::Array(array) => { + if use_arrow_eq { + // Vectorized Arrow eq kernel for primitive/string/binary types + arrow_eq(&value, &array)? + } else { + // Row-by-row comparator for nested types (Struct, List, etc.) let cmp = make_comparator( value.as_ref(), array.as_ref(), @@ -792,35 +805,40 @@ impl PhysicalExpr for InListExpr { }) .collect::() } - ColumnarValue::Scalar(scalar) => { - // Check if scalar is null once, before the loop - if scalar.is_null() { - // If scalar is null, all comparisons return null - BooleanArray::from(vec![None; num_rows]) - } else { - // Convert scalar to 1-element array - let array = scalar.to_array()?; - let cmp = make_comparator( - value.as_ref(), - array.as_ref(), - SortOptions::default(), - )?; - // Compare each row of value with the single scalar element - (0..num_rows) - .map(|i| { - if value.is_null(i) { - None - } else { - Some(cmp(i, 0).is_eq()) - } - }) - .collect::() - } + } + ColumnarValue::Scalar(scalar) => { + if scalar.is_null() { + // null compared to anything is null + BooleanArray::new( + BooleanBuffer::new_unset(num_rows), + Some(NullBuffer::new_null(num_rows)), + ) + } else if use_arrow_eq { + // Vectorized scalar comparison + let scalar_datum = scalar.to_scalar()?; + arrow_eq(&value, &scalar_datum)? + } else { + // Row-by-row comparator for nested types + let array = scalar.to_array()?; + let cmp = make_comparator( + value.as_ref(), + array.as_ref(), + SortOptions::default(), + )?; + (0..num_rows) + .map(|i| { + if value.is_null(i) { + None + } else { + Some(cmp(i, 0).is_eq()) + } + }) + .collect::() } - }; - Ok(or_kleene(&result, &rhs)?) - }, - )?; + } + }; + found = or_kleene(&found, &rhs)?; + } if self.negated { not(&found)? } else { found } } @@ -3507,4 +3525,222 @@ mod tests { Ok(()) } + + /// Helper: creates an InListExpr with `static_filter = None` + /// to force the dynamic evaluation path. + fn make_dynamic_in_list( + expr: Arc, + list: Vec>, + negated: bool, + ) -> Arc { + Arc::new(InListExpr::new(expr, list, negated, None)) + } + + #[test] + fn test_dynamic_path_int32_scalars() -> Result<()> { + // Dynamic path with scalar literals (bypassing static filter) + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let col_a = col("a", &schema)?; + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(Int32Array::from(vec![ + Some(1), + Some(2), + Some(3), + None, + ]))], + )?; + + let list = vec![ + lit(ScalarValue::Int32(Some(1))), + lit(ScalarValue::Int32(Some(3))), + ]; + let expr = make_dynamic_in_list(col_a, list, false); + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + assert_eq!( + result, + &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,]) + ); + Ok(()) + } + + #[test] + fn test_dynamic_path_int32_column_refs() -> Result<()> { + // Dynamic path with column references in the list + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + ]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), None])), + Arc::new(Int32Array::from(vec![ + Some(1), + Some(99), + Some(99), + Some(99), + ])), + Arc::new(Int32Array::from(vec![Some(99), Some(99), Some(3), None])), + ], + )?; + + let col_a = col("a", &schema)?; + let list = vec![col("b", &schema)?, col("c", &schema)?]; + let expr = make_dynamic_in_list(col_a, list, false); + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + // row 0: 1 IN (1, 99) → true + // row 1: 2 IN (99, 99) → false + // row 2: 3 IN (99, 3) → true + // row 3: NULL IN (99, NULL) → NULL + assert_eq!( + result, + &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,]) + ); + Ok(()) + } + + #[test] + fn test_dynamic_path_utf8_column_refs() -> Result<()> { + // Dynamic path with Utf8 column references + let schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(StringArray::from(vec!["x", "y", "z"])), + Arc::new(StringArray::from(vec!["x", "x", "z"])), + ], + )?; + + let col_a = col("a", &schema)?; + let list = vec![col("b", &schema)?]; + let expr = make_dynamic_in_list(col_a, list, false); + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + // row 0: "x" IN ("x") → true + // row 1: "y" IN ("x") → false + // row 2: "z" IN ("z") → true + assert_eq!(result, &BooleanArray::from(vec![true, false, true])); + Ok(()) + } + + #[test] + fn test_dynamic_path_negated() -> Result<()> { + // Dynamic path with NOT IN + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![1, 99, 3])), + ], + )?; + + let col_a = col("a", &schema)?; + let list = vec![col("b", &schema)?]; + let expr = make_dynamic_in_list(col_a, list, true); + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + // row 0: 1 NOT IN (1) → false + // row 1: 2 NOT IN (99) → true + // row 2: 3 NOT IN (3) → false + assert_eq!(result, &BooleanArray::from(vec![false, true, false])); + Ok(()) + } + + #[test] + fn test_dynamic_path_null_in_list() -> Result<()> { + // Dynamic path: list contains NULL scalar + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let col_a = col("a", &schema)?; + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(Int32Array::from(vec![1, 2]))], + )?; + + let list = vec![ + lit(ScalarValue::Int32(None)), + lit(ScalarValue::Int32(Some(1))), + ]; + let expr = make_dynamic_in_list(col_a, list, false); + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + // row 0: 1 IN (NULL, 1) → true (true OR null = true) + // row 1: 2 IN (NULL, 1) → NULL (false OR null = null) + assert_eq!(result, &BooleanArray::from(vec![Some(true), None])); + Ok(()) + } + + #[test] + fn test_dynamic_path_short_circuit() -> Result<()> { + // When all rows match from the first list item, + // remaining items should be skipped (short-circuit). + // Correctness test: result should be all true. + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + // b == a for every row → all match after first item + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![99, 99, 99])), + ], + )?; + + let col_a = col("a", &schema)?; + let list = vec![col("b", &schema)?, col("c", &schema)?]; + let expr = make_dynamic_in_list(col_a, list, false); + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + assert_eq!(result, &BooleanArray::from(vec![true, true, true])); + Ok(()) + } + + #[test] + fn test_dynamic_path_float_nan() -> Result<()> { + // Verify NaN == NaN is true in the dynamic path + // (consistent with Arrow's totalOrder semantics) + let schema = Schema::new(vec![ + Field::new("a", DataType::Float64, false), + Field::new("b", DataType::Float64, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Float64Array::from(vec![f64::NAN, 1.0, f64::NAN])), + Arc::new(Float64Array::from(vec![f64::NAN, 2.0, 0.0])), + ], + )?; + + let col_a = col("a", &schema)?; + let list = vec![col("b", &schema)?]; + let expr = make_dynamic_in_list(col_a, list, false); + + let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?; + let result = as_boolean_array(&result); + // row 0: NaN IN (NaN) → true + // row 1: 1.0 IN (2.0) → false + // row 2: NaN IN (0.0) → false + assert_eq!(result, &BooleanArray::from(vec![true, false, false])); + Ok(()) + } }