Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions datafusion/physical-expr/benches/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion_common::ScalarValue;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::{col, in_list, lit};
use rand::distr::Alphanumeric;
use rand::prelude::*;
Expand Down Expand Up @@ -50,7 +51,9 @@ fn random_string(rng: &mut StdRng, len: usize) -> String {
}

const IN_LIST_LENGTHS: [usize; 4] = [3, 8, 28, 100];
const LIST_WITH_COLUMNS_LENGTHS: [usize; 3] = [3, 8, 28];
const NULL_PERCENTS: [f64; 2] = [0., 0.2];
const MATCH_PERCENTS: [f64; 3] = [0.0, 0.5, 1.0];
const STRING_LENGTHS: [usize; 3] = [3, 12, 100];
const ARRAY_LENGTH: usize = 8192;

Expand Down Expand Up @@ -219,6 +222,165 @@ fn bench_realistic_mixed_strings<A>(
}
}

/// Benchmarks the column-reference evaluation path (no static filter) by including
/// a column reference in the IN list, which prevents static filter creation.
///
/// This simulates SQL like:
/// ```sql
/// CREATE TABLE t (a INT, b0 INT, b1 INT, b2 INT);
/// SELECT * FROM t WHERE a IN (b0, b1, b2);
/// ```
///
/// - `values`: the "needle" column (`a`)
/// - `list_cols`: the "haystack" columns (`b0`, `b1`, …)
fn do_bench_with_columns(
c: &mut Criterion,
name: &str,
values: ArrayRef,
list_cols: &[ArrayRef],
) {
let mut fields = vec![Field::new("a", values.data_type().clone(), true)];
let mut columns: Vec<ArrayRef> = vec![values];

// Build list expressions: column refs (forces non-constant evaluation path)
let schema_fields: Vec<Field> = list_cols
.iter()
.enumerate()
.map(|(i, col_arr)| {
let name = format!("b{i}");
fields.push(Field::new(&name, col_arr.data_type().clone(), true));
columns.push(Arc::clone(col_arr));
Field::new(&name, col_arr.data_type().clone(), true)
})
.collect();

let schema = Schema::new(fields);
let list_exprs: Vec<Arc<dyn PhysicalExpr>> = schema_fields
.iter()
.map(|f| col(f.name(), &schema).unwrap())
.collect();

let expr = in_list(col("a", &schema).unwrap(), list_exprs, &false, &schema).unwrap();
let batch = RecordBatch::try_new(Arc::new(schema), columns).unwrap();

c.bench_function(name, |b| {
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
}

/// Benchmarks the IN list path with column references for Int32 arrays.
///
/// Equivalent SQL:
/// ```sql
/// CREATE TABLE t (a INT, b0 INT, b1 INT, ...);
/// SELECT * FROM t WHERE a IN (b0, b1, ...);
/// ```
fn bench_with_columns_int32(c: &mut Criterion) {
let mut rng = StdRng::seed_from_u64(42);

for list_size in LIST_WITH_COLUMNS_LENGTHS {
for match_percent in MATCH_PERCENTS {
for null_percent in NULL_PERCENTS {
// Generate the "needle" column
let values: Int32Array = (0..ARRAY_LENGTH)
.map(|_| {
rng.random_bool(1.0 - null_percent)
.then(|| rng.random_range(0..1000))
})
.collect();

// Generate list columns with controlled match rate
let list_cols: Vec<ArrayRef> = (0..list_size)
.map(|_| {
let col: Int32Array = (0..ARRAY_LENGTH)
.map(|row| {
if rng.random_bool(1.0 - null_percent) {
if rng.random_bool(match_percent) {
// Copy from values to create a match
if values.is_null(row) {
Some(rng.random_range(0..1000))
} else {
Some(values.value(row))
}
} else {
// Random value (unlikely to match)
Some(rng.random_range(1000..2000))
}
} else {
None
}
})
.collect();
Arc::new(col) as ArrayRef
})
.collect();

do_bench_with_columns(
c,
&format!(
"in_list_cols/Int32/list={}/match={}%/nulls={}%",
list_size,
(match_percent * 100.0) as u32,
(null_percent * 100.0) as u32
),
Arc::new(values),
&list_cols,
);
}
}
}
}

/// Benchmarks the IN list path with column references for Utf8 arrays.
///
/// Equivalent SQL:
/// ```sql
/// CREATE TABLE t (a VARCHAR, b0 VARCHAR, b1 VARCHAR, ...);
/// SELECT * FROM t WHERE a IN (b0, b1, ...);
/// ```
fn bench_with_columns_utf8(c: &mut Criterion) {
let mut rng = StdRng::seed_from_u64(99);

for list_size in LIST_WITH_COLUMNS_LENGTHS {
for match_percent in MATCH_PERCENTS {
// Generate the "needle" column
let value_strings: Vec<Option<String>> = (0..ARRAY_LENGTH)
.map(|_| rng.random_bool(0.8).then(|| random_string(&mut rng, 12)))
.collect();
let values: StringArray =
value_strings.iter().map(|s| s.as_deref()).collect();

// Generate list columns with controlled match rate
let list_cols: Vec<ArrayRef> = (0..list_size)
.map(|_| {
let col: StringArray = (0..ARRAY_LENGTH)
.map(|row| {
if rng.random_bool(match_percent) {
// Copy from values to create a match
value_strings[row].as_deref()
} else {
Some("no_match_value_xyz")
}
})
.collect();
Arc::new(col) as ArrayRef
})
.collect();

do_bench_with_columns(
c,
&format!(
"in_list_cols/Utf8/list={}/match={}%",
list_size,
(match_percent * 100.0) as u32,
),
Arc::new(values),
&list_cols,
);
}
}
}

/// Entry point: registers in_list benchmarks for string and numeric array types.
fn criterion_benchmark(c: &mut Criterion) {
let mut rng = StdRng::seed_from_u64(120320);
Expand Down Expand Up @@ -266,6 +428,10 @@ fn criterion_benchmark(c: &mut Criterion) {
|rng| rng.random(),
|v| ScalarValue::TimestampNanosecond(Some(v), None),
);

// Column-reference path benchmarks (non-constant list expressions)
bench_with_columns_int32(c);
bench_with_columns_utf8(c);
}

criterion_group! {
Expand Down