Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod primitive_filter;
mod result;
mod static_filter;
mod strategy;
mod transform;

use static_filter::StaticFilter;
use strategy::instantiate_static_filter;
Expand Down
236 changes: 168 additions & 68 deletions datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,106 @@ use std::hash::{Hash, Hasher};
use super::result::build_in_list_result;
use super::static_filter::{StaticFilter, handle_dictionary};

/// Bitmap filter for O(1) set membership via single bit test.
/// Storage for the bits used by [`BitmapFilter`].
///
/// `UInt8` has only 256 possible values, so the filter stores membership in a
/// 256-bit bitmap instead of using a hash table.
pub(super) struct UInt8BitmapFilter {
/// `BitmapFilter` represents an `IN` list with one bit for each possible
/// value, so membership checks become direct bit tests. This trait lets the
/// same filter code use different storage sizes for different integer widths.
pub(super) trait BitmapStorage: Send + Sync {
fn new_zeroed() -> Self;
fn set_bit(&mut self, index: usize);
fn get_bit(&self, index: usize) -> bool;
}

// `UInt8` has 256 possible values, 0 through 255. One bit per value takes
// 256 bits, which fits in four `u64` words.
impl BitmapStorage for [u64; 4] {
#[inline]
fn new_zeroed() -> Self {
[0u64; 4]
}
#[inline]
fn set_bit(&mut self, index: usize) {
self[index / 64] |= 1u64 << (index % 64);
}
#[inline(always)]
fn get_bit(&self, index: usize) -> bool {
(self[index / 64] >> (index % 64)) & 1 != 0
}
}

// `UInt16` has 65,536 possible values. One bit per value takes 65,536 bits,
// which is 1,024 `u64` words, or 8 KiB. Box the array so the filter stores a
// pointer instead of carrying an 8 KiB array inline.
impl BitmapStorage for Box<[u64; 1024]> {
#[inline]
fn new_zeroed() -> Self {
Box::new([0u64; 1024])
}
#[inline]
fn set_bit(&mut self, index: usize) {
self[index / 64] |= 1u64 << (index % 64);
}
#[inline(always)]
fn get_bit(&self, index: usize) -> bool {
(self[index / 64] >> (index % 64)) & 1 != 0
}
}

/// Arrow primitive types supported by [`BitmapFilter`].
///
/// Arrow already defines the Rust value type as `T::Native`. This trait only
/// supplies the bitmap storage size for the two integer domains that are small
/// enough to represent with one bit per possible value.
pub(super) trait BitmapFilterType:
ArrowPrimitiveType + Send + Sync + 'static
{
type Storage: BitmapStorage;
}

/// `UInt8` has 256 possible values, so four `u64` words cover the full domain.
impl BitmapFilterType for UInt8Type {
type Storage = [u64; 4];
}

/// `UInt16` has 65,536 possible values, so 1,024 `u64` words cover the full
/// domain.
impl BitmapFilterType for UInt16Type {
type Storage = Box<[u64; 1024]>;
}

/// `IN` filter backed by one bit per possible value.
///
/// Building the filter scans the non-null values in the IN-list and turns on
/// the bit selected by each value. Evaluating input values checks the same bit
/// position. Null handling and `NOT IN` inversion are handled by
/// `build_in_list_result`.
pub(super) struct BitmapFilter<T: BitmapFilterType> {
null_count: usize,
bits: [u64; 4],
bits: T::Storage,
}

impl UInt8BitmapFilter {
impl<T> BitmapFilter<T>
where
T: BitmapFilterType,
{
pub(super) fn try_new(in_array: &ArrayRef) -> Result<Self> {
let prim_array = in_array.as_primitive_opt::<UInt8Type>().ok_or_else(|| {
exec_datafusion_err!("UInt8BitmapFilter: expected UInt8 array")
let prim_array = in_array.as_primitive_opt::<T>().ok_or_else(|| {
exec_datafusion_err!("BitmapFilter: expected {} array", T::DATA_TYPE)
})?;
let mut bits = [0u64; 4];
let mut set_bit = |v: u8| {
let index = usize::from(v);
bits[index / 64] |= 1u64 << (index % 64);
};

let mut bits = T::Storage::new_zeroed();
let values = prim_array.values();
match prim_array.nulls() {
None => {
for &v in values {
set_bit(v);
bits.set_bit(v.as_usize());
}
}
Some(nulls) => {
for i in
BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len())
{
set_bit(values[i]);
bits.set_bit(values[i].as_usize());
}
}
}
Expand All @@ -71,21 +139,39 @@ impl UInt8BitmapFilter {
}

#[inline(always)]
fn check(&self, needle: u8) -> bool {
let index = needle as usize;
(self.bits[index / 64] >> (index % 64)) & 1 != 0
fn check(&self, needle: T::Native) -> bool {
self.bits.get_bit(needle.as_usize())
}

/// Check membership using a raw values slice (zero-copy path for type reinterpretation).
#[inline]
pub(super) fn contains_slice(
&self,
values: &[T::Native],
nulls: Option<&NullBuffer>,
negated: bool,
) -> BooleanArray {
build_in_list_result(values.len(), nulls, self.null_count > 0, negated, |i| {
// SAFETY: `build_in_list_result` invokes this closure for
// indices in `0..values.len()`.
let needle = unsafe { *values.get_unchecked(i) };
self.check(needle)
})
}
}

impl StaticFilter for UInt8BitmapFilter {
impl<T> StaticFilter for BitmapFilter<T>
where
T: BitmapFilterType,
{
fn null_count(&self) -> usize {
self.null_count
}

fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
handle_dictionary!(self, v, negated);
let v = v.as_primitive_opt::<UInt8Type>().ok_or_else(|| {
exec_datafusion_err!("UInt8BitmapFilter: expected UInt8 array")
let v = v.as_primitive_opt::<T>().ok_or_else(|| {
exec_datafusion_err!("BitmapFilter: expected {} array", T::DATA_TYPE)
})?;
let input_values = v.values();
Ok(build_in_list_result(
Expand All @@ -104,63 +190,80 @@ impl StaticFilter for UInt8BitmapFilter {
}
}

/// Bitmap filter for O(1) `UInt16` set membership via single bit test.
/// A branchless filter for very small fixed-width primitive IN lists.
///
/// `UInt16` has 65,536 possible values, so the filter stores membership in an
/// 8 KiB heap-allocated bitmap instead of using a hash table.
pub(super) struct UInt16BitmapFilter {
/// Uses const generics to unroll the membership check into a fixed-size
/// comparison chain, outperforming hash lookups for small lists due to:
/// - No branching (uses bitwise OR to combine comparisons)
/// - Better CPU pipelining
/// - No hash computation overhead
pub(super) struct BranchlessFilter<T: ArrowPrimitiveType, const N: usize> {
null_count: usize,
bits: Box<[u64; 1024]>,
values: [T::Native; N],
}

impl UInt16BitmapFilter {
pub(super) fn try_new(in_array: &ArrayRef) -> Result<Self> {
let prim_array = in_array.as_primitive_opt::<UInt16Type>().ok_or_else(|| {
exec_datafusion_err!("UInt16BitmapFilter: expected UInt16 array")
})?;
let mut bits = Box::new([0u64; 1024]);
let mut set_bit = |v: u16| {
let index = usize::from(v);
bits[index / 64] |= 1u64 << (index % 64);
};

let values = prim_array.values();
match prim_array.nulls() {
None => {
for &v in values {
set_bit(v);
}
}
Some(nulls) => {
for i in
BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len())
{
set_bit(values[i]);
}
}
impl<T: ArrowPrimitiveType, const N: usize> BranchlessFilter<T, N>
where
T::Native: Copy + PartialEq,
{
/// Try to create a branchless filter if the array has exactly N non-null values.
pub(super) fn try_new(in_array: &ArrayRef) -> Option<Result<Self>> {
let in_array = in_array.as_primitive_opt::<T>()?;
let non_null_count = in_array.len() - in_array.null_count();
if non_null_count != N {
return None;
}
Ok(Self {
null_count: prim_array.null_count(),
bits,
})
// Use default_value() from ArrowPrimitiveType trait instead of Default::default()
let mut arr = [T::default_value(); N];
let mut i = 0;
for value in in_array.iter().flatten() {
arr[i] = value;
i += 1;
}
debug_assert_eq!(i, N);
Some(Ok(Self {
null_count: in_array.null_count(),
values: arr,
}))
}

/// Branchless membership check using OR-chain.
#[inline(always)]
fn check(&self, needle: u16) -> bool {
let index = needle as usize;
(self.bits[index / 64] >> (index % 64)) & 1 != 0
fn check(&self, needle: T::Native) -> bool {
self.values
.iter()
.fold(false, |acc, &v| acc | (v == needle))
}

/// Check membership using a raw values slice (zero-copy path for type reinterpretation).
#[inline]
pub(super) fn contains_slice(
&self,
values: &[T::Native],
nulls: Option<&NullBuffer>,
negated: bool,
) -> BooleanArray {
build_in_list_result(values.len(), nulls, self.null_count > 0, negated, |i| {
// SAFETY: `build_in_list_result` invokes this closure for
// indices in `0..values.len()`.
let needle = unsafe { *values.get_unchecked(i) };
self.check(needle)
})
}
}

impl StaticFilter for UInt16BitmapFilter {
impl<T: ArrowPrimitiveType, const N: usize> StaticFilter for BranchlessFilter<T, N>
where
T::Native: Copy + PartialEq + Send + Sync,
{
fn null_count(&self) -> usize {
self.null_count
}

fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
handle_dictionary!(self, v, negated);
let v = v.as_primitive_opt::<UInt16Type>().ok_or_else(|| {
exec_datafusion_err!("UInt16BitmapFilter: expected UInt16 array")
let v = v.as_primitive_opt::<T>().ok_or_else(|| {
exec_datafusion_err!("Failed to downcast array to primitive type")
})?;
let input_values = v.values();
Ok(build_in_list_result(
Expand Down Expand Up @@ -364,9 +467,6 @@ macro_rules! primitive_static_filter {
};
}

// Generate specialized filters for all integer primitive types
primitive_static_filter!(Int8StaticFilter, Int8Type);
primitive_static_filter!(Int16StaticFilter, Int16Type);
primitive_static_filter!(Int32StaticFilter, Int32Type);
primitive_static_filter!(Int64StaticFilter, Int64Type);
primitive_static_filter!(UInt32StaticFilter, UInt32Type);
Expand Down Expand Up @@ -406,7 +506,7 @@ mod tests {
#[test]
fn bitmap_filter_u8_handles_nulls() -> Result<()> {
let haystack: ArrayRef = Arc::new(UInt8Array::from(vec![Some(1), None, Some(3)]));
let filter = UInt8BitmapFilter::try_new(&haystack)?;
let filter = BitmapFilter::<UInt8Type>::try_new(&haystack)?;
let needles = UInt8Array::from(vec![Some(1), Some(2), None, Some(3)]);

assert_contains(&filter, &needles, vec![Some(true), None, None, Some(true)])?;
Expand All @@ -421,7 +521,7 @@ mod tests {
#[test]
fn bitmap_filter_u8_handles_dictionary_needles() -> Result<()> {
let haystack: ArrayRef = Arc::new(UInt8Array::from(vec![Some(1), None, Some(3)]));
let filter = UInt8BitmapFilter::try_new(&haystack)?;
let filter = BitmapFilter::<UInt8Type>::try_new(&haystack)?;

let keys = Int8Array::from(vec![Some(0), Some(1), None, Some(2)]);
let values = Arc::new(UInt8Array::from(vec![Some(1), Some(2), Some(3)]));
Expand All @@ -438,7 +538,7 @@ mod tests {
Some(1024),
Some(u16::MAX),
]));
let filter = UInt16BitmapFilter::try_new(&haystack)?;
let filter = BitmapFilter::<UInt16Type>::try_new(&haystack)?;
let needles =
UInt16Array::from(vec![Some(0), Some(1), Some(1024), Some(u16::MAX), None]);

Expand Down
Loading
Loading