Skip to content

Draft Poc for adaptive parquet predicate pushdown(bitmap/range) with page cache(3 data pages) #7454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 79 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
cc6dd14
update
XiangpengHao Sep 4, 2024
5837fc7
update
XiangpengHao Dec 22, 2024
fec6313
update
XiangpengHao Dec 28, 2024
948db87
update
XiangpengHao Dec 29, 2024
8c50d90
poc reader
XiangpengHao Dec 30, 2024
f5422ce
update
XiangpengHao Dec 31, 2024
dfdc1b6
avoid recreating new buffers
XiangpengHao Dec 31, 2024
3c526f8
update
XiangpengHao Dec 31, 2024
53f5fad
bug fix
XiangpengHao Dec 31, 2024
56980de
selective cache
XiangpengHao Jan 1, 2025
4dd1b6b
clean up changes
XiangpengHao Jan 1, 2025
f8f983e
clean up more and format
XiangpengHao Jan 1, 2025
882aaf1
cleanup and add docs
XiangpengHao Jan 1, 2025
c8bdbcf
switch to mutex instead of rwlock
XiangpengHao Jan 2, 2025
cdb1d85
revert irrelevant changes
XiangpengHao Jan 2, 2025
69720e5
submodule
XiangpengHao Jan 3, 2025
a9550ab
update
XiangpengHao Jan 3, 2025
be1435f
rebase
XiangpengHao Jan 6, 2025
e4d9eb7
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 8, 2025
21e015b
remove unrelated changes
XiangpengHao Jan 8, 2025
bbc3595
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 10, 2025
547fb46
fix clippy
XiangpengHao Jan 10, 2025
05c8c8f
make various ci improvements
XiangpengHao Jan 10, 2025
314fda1
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 21, 2025
c895dd2
whitespace
alamb Mar 21, 2025
3cf0a98
Reduce some ugliness, avoid unwrap
alamb Mar 21, 2025
7b72f9d
more factory
alamb Mar 21, 2025
5bdf51a
lint
alamb Mar 22, 2025
a77e1e7
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 26, 2025
90a55d5
Isolate reader cache more
alamb Mar 26, 2025
9ffa81c
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 27, 2025
7c10b4a
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 28, 2025
822760c
Add benchmark for parquet reader with row_filter and project settings
zhuqi-lucas Apr 10, 2025
31a544f
fix clippy
zhuqi-lucas Apr 10, 2025
b16428d
change bench mark to use asyn read to trigger the page cache
zhuqi-lucas Apr 11, 2025
1aacd01
fix
zhuqi-lucas Apr 11, 2025
2d58006
Merge remote-tracking branch 'upstream/main' into benchmark_row_filter
zhuqi-lucas Apr 11, 2025
768826e
fix
zhuqi-lucas Apr 11, 2025
f624b91
Update comments, add background
alamb Apr 11, 2025
6c28e44
incremently addressing the comments
zhuqi-lucas Apr 11, 2025
69a2617
Fix bool random
zhuqi-lucas Apr 11, 2025
b044813
Merge commit '69a2617' into alamb/docs_for_bench
alamb Apr 11, 2025
6a37818
fixup
alamb Apr 11, 2025
2f6ccbb
Add fn switch and project enum
zhuqi-lucas Apr 11, 2025
994c747
Merge pull request #1 from alamb/alamb/docs_for_bench
zhuqi-lucas Apr 11, 2025
d0a656b
Fix clippy
zhuqi-lucas Apr 11, 2025
67480b9
Address comment
zhuqi-lucas Apr 12, 2025
16bc1bf
Add float(half set) and int(full set) change
zhuqi-lucas Apr 12, 2025
a4bedbd
Merge branch 'benchmark_row_filter' of github.com:zhuqi-lucas/arrow-r…
zhuqi-lucas Apr 12, 2025
d0ab2fe
Fix corner case: skipping page should also make dic page to none
zhuqi-lucas Apr 12, 2025
7638c41
Address comments
zhuqi-lucas Apr 13, 2025
8fc992b
Merge branch 'benchmark_row_filter' into better-decoder
zhuqi-lucas Apr 14, 2025
9271cc9
Set compression
zhuqi-lucas Apr 14, 2025
8e00ac5
fix
zhuqi-lucas Apr 14, 2025
36346aa
Merge branch 'benchmark_row_filter' into better-decoder
zhuqi-lucas Apr 14, 2025
890519e
Update comments
alamb Apr 14, 2025
7eb0476
refactor filter column indexes
alamb Apr 14, 2025
22c7b39
Read from in memory buffer
alamb Apr 14, 2025
86878ab
Merge remote-tracking branch 'apache/main' into benchmark_row_filter
alamb Apr 14, 2025
5ae9b58
celanu
alamb Apr 14, 2025
1effe88
Test both sync and async readers
alamb Apr 14, 2025
74abec0
Merge branch 'benchmark_row_filter' into better-decoder
zhuqi-lucas Apr 15, 2025
6ea0eef
Merge branch 'main' into better-decoder
zhuqi-lucas Apr 18, 2025
0c3aa9b
Improve the performance for skip record
zhuqi-lucas Apr 19, 2025
a1d3496
Init the boolean_selector
zhuqi-lucas Apr 25, 2025
2d6c866
Init version for unified select
zhuqi-lucas Apr 26, 2025
1e9b6e5
Change to use filter
zhuqi-lucas Apr 27, 2025
21dadbe
Fix then
zhuqi-lucas Apr 27, 2025
3fe4cef
Adaptive push down
zhuqi-lucas Apr 27, 2025
e5aad7c
Fix
zhuqi-lucas Apr 27, 2025
fa0bf69
Merge branch 'better-decoder' into unified_select
zhuqi-lucas Apr 30, 2025
6432de2
Init: combine page cache with unified select
zhuqi-lucas May 1, 2025
d26de88
Perf: make the cache not missing to avoid some clickbench regression
zhuqi-lucas May 5, 2025
04ca371
Revert "Improve the performance for skip record"
zhuqi-lucas May 7, 2025
c099788
Merge branch 'polish_better_decoder' into unified_select
zhuqi-lucas May 7, 2025
c045a4a
Combine with page cache
zhuqi-lucas May 7, 2025
6e32d3b
Need to fix
zhuqi-lucas May 8, 2025
b80f596
Add performance good result
zhuqi-lucas May 8, 2025
01d1dea
Fix
zhuqi-lucas May 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ impl ByteViewArrayDecoderDictionary {
}
}

/// Reads the next indexes from self.decoder
/// Reads the next `len` indexes from self.decoder
///
/// the indexes are assumed to be indexes into `dict`
/// the output values are written to output
///
Expand Down Expand Up @@ -464,6 +465,8 @@ impl ByteViewArrayDecoderDictionary {
}
}

output.views.reserve(len);

// Calculate the offset of the dictionary buffers in the output buffers
// For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
// then the base_buffer_idx is 5 - 2 = 3
Expand Down
313 changes: 313 additions & 0 deletions parquet/src/arrow/arrow_reader/boolean_selector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;

use arrow_array::{Array, BooleanArray};
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, MutableBuffer};
use arrow_data::bit_iterator::BitIndexIterator;

use super::{RowSelection, RowSelector};

/// A selection of rows, similar to [`RowSelection`], but based on a boolean array
#[derive(Debug, Clone, PartialEq)]
pub struct BooleanRowSelection {
selector: BooleanArray,
}

impl BooleanRowSelection {
/// Create a new [`BooleanRowSelection] from a list of [`BooleanArray`].
pub fn from_filters(filters: &[BooleanArray]) -> Self {
let arrays: Vec<&dyn Array> = filters.iter().map(|x| x as &dyn Array).collect();
let result = arrow_select::concat::concat(&arrays).unwrap().into_data();
let boolean_array = BooleanArray::from(result);
BooleanRowSelection {
selector: boolean_array,
}
}

/// Create a new [`BooleanRowSelection`] with all rows unselected
pub fn new_unselected(row_count: usize) -> Self {
let buffer = BooleanBuffer::new_unset(row_count);
let boolean_array = BooleanArray::from(buffer);
BooleanRowSelection { selector: boolean_array }
}

/// Create a new [`BooleanRowSelection`] with all rows selected
pub fn new_selected(row_count: usize) -> Self {
let buffer = BooleanBuffer::new_set(row_count);
let boolean_array = BooleanArray::from(buffer);
BooleanRowSelection { selector: boolean_array }
}

/// Returns a new [`BooleanRowSelection`] that selects the inverse of this [`BooleanRowSelection`].
pub fn as_inverted(&self) -> Self {
let buffer = !self.selector.values();
BooleanRowSelection { selector: BooleanArray::from(buffer) }
}

/// Returns the number of rows selected by this [`BooleanRowSelection`].
pub fn row_count(&self) -> usize {
self.selector.true_count()
}

/// Create a new [`BooleanRowSelection`] from a list of consecutive ranges.
pub fn from_consecutive_ranges(
ranges: impl Iterator<Item = Range<usize>>,
total_rows: usize,
) -> Self {
let mut buffer = BooleanBufferBuilder::new(total_rows);
let mut last_end = 0;

for range in ranges {
let len = range.end - range.start;
if len == 0 {
continue;
}

if range.start > last_end {
buffer.append_n(range.start - last_end, false);
}
buffer.append_n(len, true);
last_end = range.end;
}

if last_end != total_rows {
buffer.append_n(total_rows - last_end, false);
}

BooleanRowSelection {
selector: BooleanArray::from(buffer.finish()),
}
}

/// Compute the union of two [`BooleanRowSelection`]
/// For example:
/// self: NNYYYYNNYYNYN
/// other: NYNNNNNNN
///
/// returned: NYYYYYNNYYNYN
#[must_use]
pub fn union(&self, other: &Self) -> Self {
// use arrow::compute::kernels::boolean::or;

let union_selectors = self.selector.values() | other.selector.values();

BooleanRowSelection {
selector: BooleanArray::from(union_selectors),
}
}

/// Compute the intersection of two [`BooleanRowSelection`]
/// For example:
/// self: NNYYYYNNYYNYN
/// other: NYNNNNNNY
///
/// returned: NNNNNNNNYYNYN
#[must_use]
pub fn intersection(&self, other: &Self) -> Self {
let intersection_selectors = self.selector.values() & other.selector.values();

BooleanRowSelection {
selector: BooleanArray::from(intersection_selectors),
}
}

/// Combines this [`BooleanRowSelection`] with another using logical AND on the selected bits.
///
/// Unlike intersection, the `other` [`BooleanRowSelection`] must have exactly as many set bits as `self`.
/// This method will keep only the bits in `self` that are also set in `other`
/// at the positions corresponding to `self`'s set bits.
pub fn and_then(&self, other: &Self) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be able to use bitwise and instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will refactor all this new file to the enum file, and we can remove this separate boolean selector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the other thing we can and should do here is change the signature to take an owned self -- also for intersection -- the API now forces a new memory allocation.

// Ensure that 'other' has exactly as many set bits as 'self'
debug_assert_eq!(
self.row_count(),
other.selector.len(),
"The 'other' selection must have exactly as many set bits as 'self'."
);

if self.selector.len() == other.selector.len() {
// fast path if the two selections are the same length
// common if this is the first predicate
debug_assert_eq!(self.row_count(), self.selector.len());
return self.intersection(other);
}

let mut buffer = MutableBuffer::from_len_zeroed(self.selector.values().inner().len());
buffer.copy_from_slice(self.selector.values().inner().as_slice());
let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.selector.len());

// Create iterators for 'self' and 'other' bits
let mut other_bits = other.selector.iter();

for bit_idx in self.true_iter() {
let predicate = other_bits
.next()
.expect("Mismatch in set bits between self and other");
if !predicate.unwrap() {
builder.set_bit(bit_idx, false);
}
}

BooleanRowSelection {
selector: BooleanArray::from(builder.finish()),
}
}

/// Returns an iterator over the indices of the set bits in this [`BooleanRowSelection`]
pub fn true_iter(&self) -> BitIndexIterator<'_> {
self.selector.values().set_indices()
}

/// Returns `true` if this [`BooleanRowSelection`] selects any rows
pub fn selects_any(&self) -> bool {
self.true_iter().next().is_some()
}

/// Returns a new [`BooleanRowSelection`] that selects the rows in this [`BooleanRowSelection`] from `offset` to `offset + len`
pub fn slice(&self, offset: usize, len: usize) -> BooleanArray {
self.selector.slice(offset, len)
}
}

// impl From<Vec<RowSelector>> for BooleanRowSelection {
// fn from(selection: Vec<RowSelector>) -> Self {
// let selection = RowSelection::from(selection);
// RowSelection::into(selection)
// }
// }
//
// impl From<RowSelection> for BooleanRowSelection {
// fn from(selection: RowSelection) -> Self {
// let total_rows = selection.row_count();
// let mut builder = BooleanBufferBuilder::new(total_rows);
//
// for selector in selection.iter() {
// if selector.skip {
// builder.append_n(selector.row_count, false);
// } else {
// builder.append_n(selector.row_count, true);
// }
// }
//
// BooleanRowSelection {
// selector: BooleanArray::from(builder.finish()),
// }
// }
// }
//
// impl From<&BooleanRowSelection> for RowSelection{
// fn from(selection: &BooleanRowSelection) -> Self {
// RowSelection::from_filters(&[selection.selector.clone()])
// }
// }

// #[cfg(test)]
// mod tests {
// use rand::Rng;
//
// use super::*;
//
// fn generate_random_row_selection(total_rows: usize, selection_ratio: f64) -> BooleanArray {
// let mut rng = rand::thread_rng();
// let bools: Vec<bool> = (0..total_rows)
// .map(|_| rng.gen_bool(selection_ratio))
// .collect();
// BooleanArray::from(bools)
// }
//
// #[test]
// fn test_boolean_row_selection_round_trip() {
// let total_rows = 1_000;
// for &selection_ratio in &[0.0, 0.1, 0.5, 0.9, 1.0] {
// let selection = generate_random_row_selection(total_rows, selection_ratio);
// let boolean_selection = BooleanRowSelection::from_filters(&[selection]);
// let row_selection = RowSelection::from(&boolean_selection);
// let boolean_selection_again = row_selection.into();
// assert_eq!(boolean_selection, boolean_selection_again);
// }
// }
//
// #[test]
// fn test_boolean_union_intersection() {
// let total_rows = 1_000;
//
// let base_boolean_selection =
// BooleanRowSelection::from_filters(&[generate_random_row_selection(total_rows, 0.1)]);
// let base_row_selection = RowSelection::from(&base_boolean_selection);
// for &selection_ratio in &[0.0, 0.1, 0.5, 0.9, 1.0] {
// let boolean_selection =
// BooleanRowSelection::from_filters(&[generate_random_row_selection(
// total_rows,
// selection_ratio,
// )]);
// let row_selection = RowSelection::from(&boolean_selection);
//
// let boolean_union = boolean_selection.union(&base_boolean_selection);
// let row_union = row_selection.union(&base_row_selection);
// assert_eq!(boolean_union, BooleanRowSelection::from(row_union));
//
// let boolean_intersection = boolean_selection.intersection(&base_boolean_selection);
// let row_intersection = row_selection.intersection(&base_row_selection);
// assert_eq!(
// boolean_intersection,
// BooleanRowSelection::from(row_intersection)
// );
// }
// }
//
// #[test]
// fn test_boolean_selection_and_then() {
// // Initial mask: 001011010101
// let self_filters = vec![BooleanArray::from(vec![
// false, false, true, false, true, true, false, true, false, true, false, true,
// ])];
// let self_selection = BooleanRowSelection::from_filters(&self_filters);
//
// // Predicate mask (only for selected bits): 001101
// let other_filters = vec![BooleanArray::from(vec![
// false, false, true, true, false, true,
// ])];
// let other_selection = BooleanRowSelection::from_filters(&other_filters);
//
// let result = self_selection.and_then(&other_selection);
//
// // Expected result: 000001010001
// let expected_filters = vec![BooleanArray::from(vec![
// false, false, false, false, false, true, false, true, false, false, false, true,
// ])];
// let expected_selection = BooleanRowSelection::from_filters(&expected_filters);
//
// assert_eq!(result, expected_selection);
// }
//
// #[test]
// #[should_panic(
// expected = "The 'other' selection must have exactly as many set bits as 'self'."
// )]
// fn test_and_then_mismatched_set_bits() {
// let self_filters = vec![BooleanArray::from(vec![true, true, false])];
// let self_selection = BooleanRowSelection::from_filters(&self_filters);
//
// // 'other' has only one set bit, but 'self' has two
// let other_filters = vec![BooleanArray::from(vec![true, false, false])];
// let other_selection = BooleanRowSelection::from_filters(&other_filters);
//
// // This should panic
// let _ = self_selection.and_then(&other_selection);
// }
// }
Loading
Loading