diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 6d6bbdc7b80..983e37364b3 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -432,7 +432,8 @@ impl ByteViewArrayDecoderDictionary { } } - /// Reads the next indexes from self.decoder + /// Reads the next `len` indexes from self.decoder + /// /// the indexes are assumed to be indexes into `dict` /// the output values are written to output /// @@ -464,6 +465,8 @@ impl ByteViewArrayDecoderDictionary { } } + output.views.reserve(len); + // Calculate the offset of the dictionary buffers in the output buffers // For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers, // then the base_buffer_idx is 5 - 2 = 3 diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 2f670a64e10..31eb4f51b10 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -17,16 +17,6 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] -use arrow_array::cast::AsArray; -use arrow_array::Array; -use arrow_array::{RecordBatch, RecordBatchReader}; -use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; -use arrow_select::filter::prep_null_mask_filter; -pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; -pub use selection::{RowSelection, RowSelector}; -use std::collections::VecDeque; -use std::sync::Arc; - pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; @@ -38,6 +28,16 @@ use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; +use arrow_array::cast::AsArray; +use arrow_array::{Array, BooleanArray}; +use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_select::filter::prep_null_mask_filter; +pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +pub use selection::{RowSelection, RowSelector}; +use std::collections::VecDeque; +use std::mem::take; +use std::sync::Arc; mod filter; mod selection; @@ -761,6 +761,7 @@ impl ReaderPageIterator { .map(|i| i[rg_idx][self.column_idx].page_locations.clone()); let total_rows = rg.num_rows() as usize; let reader = self.reader.clone(); + // todo: add cache??? SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)? .add_crypto_context( @@ -792,7 +793,7 @@ pub struct ParquetRecordBatchReader { batch_size: usize, array_reader: Box, schema: SchemaRef, - selection: Option>, + selection: Option, } impl Iterator for ParquetRecordBatchReader { @@ -800,10 +801,12 @@ impl Iterator for ParquetRecordBatchReader { fn next(&mut self) -> Option { let mut read_records = 0; + let mut filter: Option = None; match self.selection.as_mut() { - Some(selection) => { - while read_records < self.batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); + Some(RowSelection::Ranges(selectors)) => { + let mut selectors: VecDeque = VecDeque::from_iter(take(selectors)); + while read_records < self.batch_size && !selectors.is_empty() { + let front = selectors.pop_front().unwrap(); if front.skip { let skipped = match self.array_reader.skip_records(front.row_count) { Ok(skipped) => skipped, @@ -833,7 +836,7 @@ impl Iterator for ParquetRecordBatchReader { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); + selectors.push_front(RowSelector::select(remaining)); need_read } _ => front.row_count, @@ -844,7 +847,14 @@ impl Iterator for ParquetRecordBatchReader { Err(error) => return Some(Err(error.into())), } } + self.selection = Some(RowSelection::Ranges(selectors.into())); } + Some(RowSelection::BitMap(bitmap)) => { + self.array_reader.read_records(bitmap.len()).unwrap(); + filter = Some(bitmap.clone()); + self.selection = None; + } + None => { if let Err(error) = self.array_reader.read_records(self.batch_size) { return Some(Err(error.into())); @@ -861,9 +871,31 @@ impl Iterator for ParquetRecordBatchReader { ) }); - match struct_array { - Err(err) => Some(Err(err)), - Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))), + let batch: RecordBatch = match struct_array { + Err(err) => return Some(Err(err)), + Ok(e) => e.into(), + }; + + if let Some(filter) = filter.as_mut() { + if batch.num_rows() == 0 { + return None; + } + if filter.len() != batch.num_rows() { + return Some(Err(ArrowError::ComputeError(format!( + "Filter length ({}) does not match batch rows ({})", + filter.len(), + batch.num_rows() + )))); + } + + match arrow_select::filter::filter_record_batch(&batch, filter) { + Ok(filtered_batch) => Some(Ok(filtered_batch)), + Err(e) => Some(Err(e)), + } + } else if batch.num_rows() > 0 { + Some(Ok(batch)) + } else { + None } } } @@ -907,7 +939,7 @@ impl ParquetRecordBatchReader { batch_size, array_reader, schema: Arc::new(Schema::new(levels.fields.clone())), - selection: selection.map(|s| s.trim().into()), + selection: selection.map(|s| s.trim()), }) } @@ -928,7 +960,7 @@ impl ParquetRecordBatchReader { batch_size, array_reader, schema: Arc::new(schema), - selection: selection.map(|s| s.trim().into()), + selection: selection.map(|s| s.trim()), } } } @@ -1002,6 +1034,7 @@ pub(crate) fn evaluate_predicate( filter.len() )); } + match filter.null_count() { 0 => filters.push(filter), _ => filters.push(prep_null_mask_filter(&filter)), diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index c53d47be2e5..9ead35cde8b 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -16,6 +16,7 @@ // under the License. use arrow_array::{Array, BooleanArray}; +use arrow_buffer::{BooleanBufferBuilder, MutableBuffer}; use arrow_select::filter::SlicesIterator; use std::cmp::Ordering; use std::collections::VecDeque; @@ -56,7 +57,7 @@ impl RowSelector { /// This is applied prior to reading column data, and can therefore /// be used to skip IO to fetch data into memory /// -/// A typical use-case would be using the [`PageIndex`] to filter out rows +/// A typical use-case would be using the PageIndex to filter out rows /// that don't satisfy a predicate /// /// # Example @@ -90,18 +91,32 @@ impl RowSelector { /// assert_eq!(actual, expected); /// ``` /// -/// A [`RowSelection`] maintains the following invariants: -/// -/// * It contains no [`RowSelector`] of 0 rows -/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows -/// -/// [`PageIndex`]: crate::file::page_index::index::PageIndex -#[derive(Debug, Clone, Default, Eq, PartialEq)] -pub struct RowSelection { - selectors: Vec, +/// [`RowSelection`] is an enum that can be either a list of [`RowSelector`]s +/// or a [`BooleanArray`] bitmap +#[derive(Debug, Clone, PartialEq)] +pub enum RowSelection { + /// A list of [`RowSelector`]s + Ranges(Vec), + /// A [`BooleanArray`] bitmap + BitMap(BooleanArray), +} + +impl Default for RowSelection { + fn default() -> Self { + RowSelection::Ranges(Vec::new()) + } } impl RowSelection { + /// Returns the [`RowSelector`]s for this [`RowSelection`] + /// It only works for [`RowSelection::Ranges`] + pub fn selectors(&self) -> &[RowSelector] { + match self { + RowSelection::Ranges(selectors) => selectors, + RowSelection::BitMap(_) => panic!("called selectors() on RowSelection::BitMap"), + } + } + /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] /// /// # Panic @@ -121,6 +136,15 @@ impl RowSelection { Self::from_consecutive_ranges(iter, total_rows) } + /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] as a bitmap + /// + pub fn from_filters_as_bitmap(filters: &[BooleanArray]) -> Self { + let arrays: Vec<&dyn Array> = filters.iter().map(|x| x as &dyn Array).collect(); + let result = arrow_select::concat::concat(&arrays).unwrap().into_data(); + let boolean_array = BooleanArray::from(result); + Self::BitMap(boolean_array) + } + /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep pub fn from_consecutive_ranges>>( ranges: I, @@ -152,7 +176,7 @@ impl RowSelection { selectors.push(RowSelector::skip(total_rows - last_end)) } - Self { selectors } + RowSelection::Ranges(selectors) } /// Given an offset index, return the byte ranges for all data pages selected by `self` @@ -163,91 +187,107 @@ impl RowSelection { /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { - let mut ranges: Vec> = vec![]; - let mut row_offset = 0; - - let mut pages = page_locations.iter().peekable(); - let mut selectors = self.selectors.iter().cloned(); - let mut current_selector = selectors.next(); - let mut current_page = pages.next(); - - let mut current_page_included = false; - - while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { - if !(selector.skip || current_page_included) { - let start = page.offset as u64; - let end = start + page.compressed_page_size as u64; - ranges.push(start..end); - current_page_included = true; - } + match self { + RowSelection::Ranges(selectors) => { + let mut ranges: Vec> = vec![]; + let mut row_offset = 0; + + let mut pages = page_locations.iter().peekable(); + let mut selectors = selectors.iter().cloned(); + let mut current_selector = selectors.next(); + let mut current_page = pages.next(); + + let mut current_page_included = false; + + while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { + if !(selector.skip || current_page_included) { + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; + ranges.push(start..end); + current_page_included = true; + } - if let Some(next_page) = pages.peek() { - if row_offset + selector.row_count > next_page.first_row_index as usize { - let remaining_in_page = next_page.first_row_index as usize - row_offset; - selector.row_count -= remaining_in_page; - row_offset += remaining_in_page; - current_page = pages.next(); - current_page_included = false; - - continue; - } else { - if row_offset + selector.row_count == next_page.first_row_index as usize { - current_page = pages.next(); - current_page_included = false; + if let Some(next_page) = pages.peek() { + if row_offset + selector.row_count > next_page.first_row_index as usize { + let remaining_in_page = next_page.first_row_index as usize - row_offset; + selector.row_count -= remaining_in_page; + row_offset += remaining_in_page; + current_page = pages.next(); + current_page_included = false; + + continue; + } else { + if row_offset + selector.row_count == next_page.first_row_index as usize + { + current_page = pages.next(); + current_page_included = false; + } + row_offset += selector.row_count; + current_selector = selectors.next(); + } + } else { + if !(selector.skip || current_page_included) { + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; + ranges.push(start..end); + } + current_selector = selectors.next() } - row_offset += selector.row_count; - current_selector = selectors.next(); - } - } else { - if !(selector.skip || current_page_included) { - let start = page.offset as u64; - let end = start + page.compressed_page_size as u64; - ranges.push(start..end); } - current_selector = selectors.next() + + ranges + } + RowSelection::BitMap(_) => { + // not implemented yet + unimplemented!("BitMap variant is not yet supported") } } - - ranges } /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { - let mut total_count = 0; - - // Find the index where the selector exceeds the row count - let find = self.selectors.iter().position(|selector| { - total_count += selector.row_count; - total_count > row_count - }); - - let split_idx = match find { - Some(idx) => idx, - None => { - let selectors = std::mem::take(&mut self.selectors); - return Self { selectors }; + match self { + RowSelection::BitMap(_) => { + // not implemented yet + unimplemented!("BitMap variant is not yet supported") } - }; + RowSelection::Ranges(selectors) => { + let mut total_count = 0; + + // Find the index where the selector exceeds the row count + let find = selectors.iter().position(|selector| { + total_count += selector.row_count; + total_count > row_count + }); + + let split_idx = match find { + Some(idx) => idx, + None => { + let selectors = std::mem::take(selectors); + return RowSelection::Ranges(selectors); + } + }; - let mut remaining = self.selectors.split_off(split_idx); + let mut remaining = selectors.split_off(split_idx); - // Always present as `split_idx < self.selectors.len` - let next = remaining.first_mut().unwrap(); - let overflow = total_count - row_count; + // Always present as `split_idx < self.selectors.len` + let next = remaining.first_mut().unwrap(); + let overflow = total_count - row_count; - if next.row_count != overflow { - self.selectors.push(RowSelector { - row_count: next.row_count - overflow, - skip: next.skip, - }) - } - next.row_count = overflow; + if next.row_count != overflow { + selectors.push(RowSelector { + row_count: next.row_count - overflow, + skip: next.skip, + }) + } + next.row_count = overflow; - std::mem::swap(&mut remaining, &mut self.selectors); - Self { - selectors: remaining, + std::mem::swap(&mut remaining, selectors); + RowSelection::Ranges(remaining) + } } } + /// returns a [`RowSelection`] representing rows that are selected in both /// input [`RowSelection`]s. /// @@ -271,66 +311,106 @@ impl RowSelection { /// by this RowSelection /// pub fn and_then(&self, other: &Self) -> Self { - let mut selectors = vec![]; - let mut first = self.selectors.iter().cloned().peekable(); - let mut second = other.selectors.iter().cloned().peekable(); - - let mut to_skip = 0; - while let Some(b) = second.peek_mut() { - let a = first - .peek_mut() - .expect("selection exceeds the number of selected rows"); - - if b.row_count == 0 { - second.next().unwrap(); - continue; - } + match (self, other) { + (RowSelection::BitMap(bit_map), RowSelection::BitMap(other_bitmap)) => { + // Ensure that 'other' has exactly as many set bits as 'self' + debug_assert_eq!( + self.row_count(), + other_bitmap.len(), + "The 'other' selection must have exactly as many set bits as 'self'." + ); - if a.row_count == 0 { - first.next().unwrap(); - continue; - } + if bit_map.len() == other_bitmap.len() { + // fast path if the two selections are the same length + // common if this is the first predicate + debug_assert_eq!(self.row_count(), other_bitmap.len()); + return self.intersection(other); + } - if a.skip { - // Records were skipped when producing second - to_skip += a.row_count; - first.next().unwrap(); - continue; + let mut buffer = MutableBuffer::from_len_zeroed(bit_map.values().inner().len()); + buffer.copy_from_slice(bit_map.values().inner().as_slice()); + let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, bit_map.len()); + + // Create iterators for 'self' and 'other' bits + let mut other_bits = other_bitmap.iter(); + + for bit_idx in bit_map.values().set_indices() { + let predicate = other_bits + .next() + .expect("Mismatch in set bits between self and other"); + if !predicate.unwrap() { + builder.set_bit(bit_idx, false); + } + } + + RowSelection::BitMap(BooleanArray::from(builder.finish())) } + (RowSelection::Ranges(selectors), RowSelection::Ranges(other_selectors)) => { + let mut and_then_select = vec![]; + let mut first = selectors.iter().cloned().peekable(); + let mut second = other_selectors.iter().cloned().peekable(); + + let mut to_skip = 0; + while let Some(b) = second.peek_mut() { + let a = first + .peek_mut() + .expect("selection exceeds the number of selected rows"); + + if b.row_count == 0 { + second.next().unwrap(); + continue; + } + + if a.row_count == 0 { + first.next().unwrap(); + continue; + } + + if a.skip { + // Records were skipped when producing second + to_skip += a.row_count; + first.next().unwrap(); + continue; + } - let skip = b.skip; - let to_process = a.row_count.min(b.row_count); + let skip = b.skip; + let to_process = a.row_count.min(b.row_count); - a.row_count -= to_process; - b.row_count -= to_process; + a.row_count -= to_process; + b.row_count -= to_process; - match skip { - true => to_skip += to_process, - false => { - if to_skip != 0 { - selectors.push(RowSelector::skip(to_skip)); - to_skip = 0; + match skip { + true => to_skip += to_process, + false => { + if to_skip != 0 { + and_then_select.push(RowSelector::skip(to_skip)); + to_skip = 0; + } + and_then_select.push(RowSelector::select(to_process)) + } } - selectors.push(RowSelector::select(to_process)) } - } - } - for v in first { - if v.row_count != 0 { - assert!( - v.skip, - "selection contains less than the number of selected rows" - ); - to_skip += v.row_count - } - } + for v in first { + if v.row_count != 0 { + assert!( + v.skip, + "selection contains less than the number of selected rows" + ); + to_skip += v.row_count + } + } - if to_skip != 0 { - selectors.push(RowSelector::skip(to_skip)); + if to_skip != 0 { + and_then_select.push(RowSelector::skip(to_skip)); + } + RowSelection::Ranges(and_then_select) + } + (_, _) => { + // not implemented yet + unimplemented!("BitMap variant is not yet supported") + } } - - Self { selectors } } /// Compute the intersection of two [`RowSelection`] @@ -340,7 +420,17 @@ impl RowSelection { /// /// returned: NNNNNNNNYYNYN pub fn intersection(&self, other: &Self) -> Self { - intersect_row_selections(&self.selectors, &other.selectors) + match (self, other) { + (RowSelection::Ranges(a), RowSelection::Ranges(b)) => { + RowSelection::Ranges(intersect_row_selections(a, b).selectors().to_vec()) + } + (RowSelection::BitMap(bit_map), RowSelection::BitMap(other_bit_map)) => { + let intersection_selectors = bit_map.values() & other_bit_map.values(); + RowSelection::BitMap(BooleanArray::from(intersection_selectors)) + } + (RowSelection::BitMap(_), RowSelection::Ranges(_)) => todo!(), + (RowSelection::Ranges(_), RowSelection::BitMap(_)) => todo!(), + } } /// Compute the union of two [`RowSelection`] @@ -350,91 +440,167 @@ impl RowSelection { /// /// returned: NYYYYYNNYYNYN pub fn union(&self, other: &Self) -> Self { - union_row_selections(&self.selectors, &other.selectors) + match (self, other) { + (RowSelection::Ranges(a), RowSelection::Ranges(b)) => { + RowSelection::Ranges(union_row_selections(a, b).selectors().to_vec()) + } + (RowSelection::BitMap(_), _) | (_, RowSelection::BitMap(_)) => { + unimplemented!("BitMap variant is not yet supported") + } + } } /// Returns `true` if this [`RowSelection`] selects any rows pub fn selects_any(&self) -> bool { - self.selectors.iter().any(|x| !x.skip) + match self { + RowSelection::Ranges(selectors) => selectors.iter().any(|x| !x.skip), + RowSelection::BitMap(bitmap) => bitmap.true_count() > 0, + } } /// Trims this [`RowSelection`] removing any trailing skips - pub(crate) fn trim(mut self) -> Self { - while self.selectors.last().map(|x| x.skip).unwrap_or(false) { - self.selectors.pop(); + pub(crate) fn trim(self) -> Self { + match self { + RowSelection::Ranges(mut selectors) => { + while selectors.last().map(|x| x.skip).unwrap_or(false) { + selectors.pop(); + } + RowSelection::Ranges(selectors) + } + RowSelection::BitMap(bitmap) => { + // find the last `true` in the mask + let new_len = bitmap + .iter() + .rposition(|opt| opt == Some(true)) + .map(|idx| idx + 1) + .unwrap_or(0); + + // slice off any trailing `false` (or null) values + let trimmed = bitmap.slice(0, new_len); + RowSelection::BitMap(trimmed) + } } - self } /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows - pub(crate) fn offset(mut self, offset: usize) -> Self { - if offset == 0 { - return self; - } + pub(crate) fn offset(self, offset: usize) -> Self { + match self { + RowSelection::Ranges(mut selectors) => { + if offset == 0 { + return RowSelection::Ranges(selectors); + } - let mut selected_count = 0; - let mut skipped_count = 0; + let mut selected_count = 0; + let mut skipped_count = 0; - // Find the index where the selector exceeds the row count - let find = self - .selectors - .iter() - .position(|selector| match selector.skip { - true => { - skipped_count += selector.row_count; - false - } - false => { - selected_count += selector.row_count; - selected_count > offset - } - }); + // Find the index where the selector exceeds the row count + let find = selectors.iter().position(|selector| match selector.skip { + true => { + skipped_count += selector.row_count; + false + } + false => { + selected_count += selector.row_count; + selected_count > offset + } + }); - let split_idx = match find { - Some(idx) => idx, - None => { - self.selectors.clear(); - return self; - } - }; + let split_idx = match find { + Some(idx) => idx, + None => { + selectors.clear(); + return RowSelection::Ranges(selectors); + } + }; - let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1); - selectors.push(RowSelector::skip(skipped_count + offset)); - selectors.push(RowSelector::select(selected_count - offset)); - selectors.extend_from_slice(&self.selectors[split_idx + 1..]); + let mut select = Vec::with_capacity(selectors.len() - split_idx + 1); + select.push(RowSelector::skip(skipped_count + offset)); + select.push(RowSelector::select(selected_count - offset)); + select.extend_from_slice(&selectors[split_idx + 1..]); - Self { selectors } + RowSelection::Ranges(select) + } + RowSelection::BitMap(_) => { + // not implemented yet + unimplemented!("BitMap variant is not yet supported") + } + } } /// Limit this [`RowSelection`] to only select `limit` rows - pub(crate) fn limit(mut self, mut limit: usize) -> Self { - if limit == 0 { - self.selectors.clear(); - } + pub(crate) fn limit(self, mut limit: usize) -> Self { + match self { + RowSelection::Ranges(mut selectors) => { + if limit == 0 { + selectors.clear(); + } - for (idx, selection) in self.selectors.iter_mut().enumerate() { - if !selection.skip { - if selection.row_count >= limit { - selection.row_count = limit; - self.selectors.truncate(idx + 1); - break; - } else { - limit -= selection.row_count; + for (idx, selection) in selectors.iter_mut().enumerate() { + if !selection.skip { + if selection.row_count >= limit { + selection.row_count = limit; + selectors.truncate(idx + 1); + break; + } else { + limit -= selection.row_count; + } + } } + RowSelection::Ranges(selectors) + } + RowSelection::BitMap(_) => { + // not implemented yet + unimplemented!("BitMap variant is not yet supported") } } - self } /// Returns an iterator over the [`RowSelector`]s for this /// [`RowSelection`]. pub fn iter(&self) -> impl Iterator { - self.selectors.iter() + match self { + RowSelection::Ranges(selectors) => selectors.iter(), + RowSelection::BitMap(_) => { + // not implemented yet + unimplemented!("BitMap variant is not yet supported") + } + } + } + + /// Returns the total number of rows in this [`RowSelection`] + pub fn total_rows(&self) -> usize { + match self { + RowSelection::Ranges(selectors) => selectors.iter().map(|s| s.row_count).sum(), + RowSelection::BitMap(bitmap) => bitmap.len(), + } + } + + /// Returns the number of selectors in this [`RowSelection`] + pub fn len(&self) -> usize { + match self { + RowSelection::Ranges(selectors) => selectors.len(), + RowSelection::BitMap(bitmap) => bitmap.len(), + } + } + + /// Returns `true` if this [`RowSelection`] is empty + pub fn is_empty(&self) -> bool { + match self { + RowSelection::Ranges(selectors) => selectors.is_empty(), + RowSelection::BitMap(bitmap) => bitmap.is_empty(), + } } /// Returns the number of selected rows pub fn row_count(&self) -> usize { - self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum() + match self { + RowSelection::Ranges(selectors) => selectors + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(), + RowSelection::BitMap(bitmap) => bitmap.true_count(), + } } /// Returns the number of de-selected rows @@ -443,12 +609,53 @@ impl RowSelection { } } +/// Convert a BooleanArray (no nulls) into a `Vec`, +/// alternating `select(run_len)` and `skip(run_len)`. +fn selectors_from_bitmap(bitmap: &BooleanArray) -> Vec { + let mut selectors = Vec::new(); + let mut run_skip = !bitmap.value(0); // start by skipping if first bit is false + let mut run_len = 0; + + for i in 0..bitmap.len() { + let bit = bitmap.value(i); + if bit != run_skip { + // same as current run type (select vs skip) + run_len += 1; + } else { + // flush previous run + selectors.push(if run_skip { + RowSelector::skip(run_len) + } else { + RowSelector::select(run_len) + }); + // start new run + run_skip = !run_skip; + run_len = 1; + } + } + // flush final run + if run_len > 0 { + selectors.push(if run_skip { + RowSelector::skip(run_len) + } else { + RowSelector::select(run_len) + }); + } + selectors +} + impl From> for RowSelection { fn from(selectors: Vec) -> Self { selectors.into_iter().collect() } } +impl From for RowSelection { + fn from(value: BooleanArray) -> Self { + RowSelection::BitMap(value) + } +} + impl FromIterator for RowSelection { fn from_iter>(iter: T) -> Self { let iter = iter.into_iter(); @@ -475,19 +682,28 @@ impl FromIterator for RowSelection { } } - Self { selectors } + RowSelection::Ranges(selectors) } } impl From for Vec { fn from(r: RowSelection) -> Self { - r.selectors + match r { + RowSelection::Ranges(selectors) => selectors, + RowSelection::BitMap(bitmap) => selectors_from_bitmap(&bitmap), + } } } impl From for VecDeque { fn from(r: RowSelection) -> Self { - r.selectors.into() + match r { + RowSelection::Ranges(selectors) => selectors.into(), + RowSelection::BitMap(_) => { + // not implemented yet + unimplemented!("BitMap variant is not yet supported") + } + } } } @@ -654,15 +870,14 @@ mod tests { let selection = RowSelection::from_filters(&filters[..1]); assert!(selection.selects_any()); - assert_eq!( - selection.selectors, - vec![RowSelector::skip(3), RowSelector::select(4)] - ); + let expected: Vec = selection.into(); + assert_eq!(expected, vec![RowSelector::skip(3), RowSelector::select(4)]); let selection = RowSelection::from_filters(&filters[..2]); assert!(selection.selects_any()); + let expected: Vec = selection.into(); assert_eq!( - selection.selectors, + expected, vec![ RowSelector::skip(3), RowSelector::select(6), @@ -673,8 +888,9 @@ mod tests { let selection = RowSelection::from_filters(&filters); assert!(selection.selects_any()); + let expected: Vec = selection.into(); assert_eq!( - selection.selectors, + expected, vec![ RowSelector::skip(3), RowSelector::select(6), @@ -686,7 +902,8 @@ mod tests { let selection = RowSelection::from_filters(&filters[2..3]); assert!(!selection.selects_any()); - assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); + let expected: Vec = selection.into(); + assert_eq!(expected, vec![RowSelector::skip(4)]); } #[test] @@ -699,43 +916,83 @@ mod tests { ]); let split = selection.split_off(34); - assert_eq!(split.selectors, vec![RowSelector::skip(34)]); - assert_eq!( - selection.selectors, - vec![ - RowSelector::select(12), - RowSelector::skip(3), - RowSelector::select(35) - ] - ); + match split { + RowSelection::Ranges(selectors) => { + assert_eq!(selectors, vec![RowSelector::skip(34)]); + } + _ => panic!("Expected Ranges variant"), + } + match selection { + RowSelection::Ranges(ref selectors) => { + assert_eq!( + selectors, + &vec![ + RowSelector::select(12), + RowSelector::skip(3), + RowSelector::select(35) + ] + ); + } + _ => panic!("Expected Ranges variant"), + } let split = selection.split_off(5); - assert_eq!(split.selectors, vec![RowSelector::select(5)]); - assert_eq!( - selection.selectors, - vec![ - RowSelector::select(7), - RowSelector::skip(3), - RowSelector::select(35) - ] - ); + match split { + RowSelection::Ranges(selectors) => { + assert_eq!(selectors, vec![RowSelector::select(5)]); + } + _ => panic!("Expected Ranges variant"), + } + match selection { + RowSelection::Ranges(ref selectors) => { + assert_eq!( + selectors, + &vec![ + RowSelector::select(7), + RowSelector::skip(3), + RowSelector::select(35) + ] + ); + } + _ => panic!("Expected Ranges variant"), + } let split = selection.split_off(8); - assert_eq!( - split.selectors, - vec![RowSelector::select(7), RowSelector::skip(1)] - ); - assert_eq!( - selection.selectors, - vec![RowSelector::skip(2), RowSelector::select(35)] - ); + match split { + RowSelection::Ranges(selectors) => { + assert_eq!( + selectors, + vec![RowSelector::select(7), RowSelector::skip(1)] + ); + } + _ => panic!("Expected Ranges variant"), + } + match selection { + RowSelection::Ranges(ref selectors) => { + assert_eq!( + selectors, + &vec![RowSelector::skip(2), RowSelector::select(35)] + ); + } + _ => panic!("Expected Ranges variant"), + } let split = selection.split_off(200); - assert_eq!( - split.selectors, - vec![RowSelector::skip(2), RowSelector::select(35)] - ); - assert!(selection.selectors.is_empty()); + match split { + RowSelection::Ranges(selectors) => { + assert_eq!( + selectors, + vec![RowSelector::skip(2), RowSelector::select(35)] + ); + } + _ => panic!("Expected Ranges variant"), + } + match selection { + RowSelection::Ranges(ref selectors) => { + assert!(selectors.is_empty()); + } + _ => panic!("Expected Ranges variant"), + } } #[test] @@ -750,7 +1007,7 @@ mod tests { let selection = selection.offset(2); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(2), RowSelector::select(3), @@ -763,7 +1020,7 @@ mod tests { let selection = selection.offset(5); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(30), RowSelector::select(5), @@ -774,7 +1031,7 @@ mod tests { let selection = selection.offset(3); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(33), RowSelector::select(2), @@ -785,13 +1042,13 @@ mod tests { let selection = selection.offset(2); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(68), RowSelector::select(6),] ); let selection = selection.offset(3); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(71), RowSelector::select(3),] ); } @@ -838,7 +1095,7 @@ mod tests { ]); assert_eq!( - a.and_then(&b).selectors, + a.and_then(&b).selectors(), vec![ RowSelector::select(2), RowSelector::skip(1), @@ -891,30 +1148,30 @@ mod tests { fn test_combine_2elements() { let a = vec![RowSelector::select(10), RowSelector::select(5)]; let a_expect = vec![RowSelector::select(15)]; - assert_eq!(RowSelection::from_iter(a).selectors, a_expect); + assert_eq!(RowSelection::from_iter(a).selectors(), a_expect); let b = vec![RowSelector::select(10), RowSelector::skip(5)]; let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)]; - assert_eq!(RowSelection::from_iter(b).selectors, b_expect); + assert_eq!(RowSelection::from_iter(b).selectors(), b_expect); let c = vec![RowSelector::skip(10), RowSelector::select(5)]; let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)]; - assert_eq!(RowSelection::from_iter(c).selectors, c_expect); + assert_eq!(RowSelection::from_iter(c).selectors(), c_expect); let d = vec![RowSelector::skip(10), RowSelector::skip(5)]; let d_expect = vec![RowSelector::skip(15)]; - assert_eq!(RowSelection::from_iter(d).selectors, d_expect); + assert_eq!(RowSelection::from_iter(d).selectors(), d_expect); } #[test] fn test_from_one_and_empty() { let a = vec![RowSelector::select(10)]; let selection1 = RowSelection::from(a.clone()); - assert_eq!(selection1.selectors, a); + assert_eq!(selection1.selectors(), a); let b = vec![]; let selection1 = RowSelection::from(b.clone()); - assert_eq!(selection1.selectors, b) + assert_eq!(selection1.selectors(), b) } #[test] @@ -959,7 +1216,7 @@ mod tests { let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![ RowSelector::select(5), RowSelector::skip(4), @@ -977,7 +1234,7 @@ mod tests { let b = vec![RowSelector::select(36), RowSelector::skip(36)]; let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![RowSelector::select(3), RowSelector::skip(69)] ); @@ -992,7 +1249,7 @@ mod tests { ]; let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![RowSelector::select(2), RowSelector::skip(8)] ); @@ -1006,7 +1263,7 @@ mod tests { ]; let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![RowSelector::select(2), RowSelector::skip(8)] ); } @@ -1034,7 +1291,7 @@ mod tests { let expected = RowSelection::from_filters(&[BooleanArray::from(expected_bools)]); - let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum(); + let total_rows: usize = expected.iter().map(|s| s.row_count).sum(); assert_eq!(a_len, total_rows); assert_eq!(a.and_then(&b), expected); @@ -1075,7 +1332,7 @@ mod tests { let limited = selection.clone().limit(5); let expected = vec![RowSelector::select(5)]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.clone().limit(15); let expected = vec![ @@ -1083,11 +1340,11 @@ mod tests { RowSelector::skip(10), RowSelector::select(5), ]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.clone().limit(0); let expected = vec![]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.clone().limit(30); let expected = vec![ @@ -1097,7 +1354,7 @@ mod tests { RowSelector::skip(10), RowSelector::select(10), ]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.limit(100); let expected = vec![ @@ -1107,7 +1364,7 @@ mod tests { RowSelector::skip(10), RowSelector::select(10), ]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); } #[test] @@ -1248,7 +1505,7 @@ mod tests { let ranges = [1..3, 4..6, 6..6, 8..8, 9..10]; let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(1), RowSelector::select(2), @@ -1274,7 +1531,7 @@ mod tests { RowSelector::skip(0), RowSelector::select(2), ]); - assert_eq!(selection.selectors, vec![RowSelector::select(4)]); + assert_eq!(selection.selectors(), vec![RowSelector::select(4)]); let selection = RowSelection::from(vec![ RowSelector::select(0), @@ -1282,7 +1539,7 @@ mod tests { RowSelector::select(0), RowSelector::skip(2), ]); - assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); + assert_eq!(selection.selectors(), vec![RowSelector::skip(4)]); } #[test] @@ -1306,7 +1563,7 @@ mod tests { let result = a.intersection(&b); assert_eq!( - result.selectors, + result.selectors(), vec![ RowSelector::skip(30), RowSelector::select(10), diff --git a/parquet/src/arrow/async_reader/arrow_reader.rs b/parquet/src/arrow/async_reader/arrow_reader.rs new file mode 100644 index 00000000000..6a8a9fc5db3 --- /dev/null +++ b/parquet/src/arrow/async_reader/arrow_reader.rs @@ -0,0 +1,756 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::{Mutex, MutexGuard}; +use std::{collections::VecDeque, sync::Arc}; + +use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader}; +use arrow_array::{ArrayRef, BooleanArray}; +use arrow_buffer::BooleanBufferBuilder; +use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; +use arrow_select::filter::{filter, prep_null_mask_filter}; + +use crate::basic::PageType; +use crate::column::page::{Page, PageMetadata, PageReader}; +use crate::errors::ParquetError; +use crate::{ + arrow::{ + array_reader::ArrayReader, + arrow_reader::{RowFilter, RowSelection, RowSelector}, + }, + file::reader::{ChunkReader, SerializedPageReader}, +}; + +pub struct FilteredParquetRecordBatchReader { + batch_size: usize, + array_reader: Box, + predicate_readers: Vec>, + schema: SchemaRef, + selection: Option, + row_filter: Option, +} + +fn read_selection( + reader: &mut dyn ArrayReader, + selection: &RowSelection, +) -> Result { + match selection { + RowSelection::Ranges(selectors) => { + for selector in selectors { + if selector.skip { + reader.skip_records(selector.row_count)?; + } else { + reader.read_records(selector.row_count)?; + } + } + reader.consume_batch() + } + + RowSelection::BitMap(bitmap) => { + let to_read = bitmap.len(); + reader.read_records(to_read)?; + let array = reader.consume_batch()?; + + let filtered_array = + filter(&array, bitmap).map_err(|e| ParquetError::General(e.to_string()))?; + + Ok(filtered_array) + } + } +} + +fn take_next_selection( + selection: &mut Option, + to_select: usize, +) -> Option { + let current = selection.as_ref()?.clone(); + + if let RowSelection::BitMap(bitmap) = current { + let take = bitmap.len().min(to_select); + let prefix = bitmap.slice(0, take); + let suffix_len = bitmap.len() - take; + let suffix = if suffix_len > 0 { + Some(bitmap.slice(take, suffix_len)) + } else { + None + }; + *selection = suffix.map(RowSelection::BitMap); + return Some(RowSelection::BitMap(prefix)); + } + + let RowSelection::Ranges(runs) = current else { + unreachable!() + }; + let mut queue: VecDeque = runs.into(); + let mut taken = Vec::new(); + let mut count = 0; + + while let Some(run) = queue.pop_front() { + if run.skip { + taken.push(run); + continue; + } + let room = to_select.saturating_sub(count); + if room == 0 { + queue.push_front(run); + break; + } + if run.row_count <= room { + taken.push(run); + count += run.row_count; + } else { + taken.push(RowSelector::select(room)); + queue.push_front(RowSelector::select(run.row_count - room)); + break; + } + } + + *selection = if queue.is_empty() { + None + } else { + Some(RowSelection::Ranges(queue.into_iter().collect())) + }; + + Some(RowSelection::Ranges(taken)) +} + +impl FilteredParquetRecordBatchReader { + pub(crate) fn new( + batch_size: usize, + array_reader: Box, + selection: Option, + filter_readers: Vec>, + row_filter: Option, + ) -> Self { + let schema = match array_reader.get_data_type() { + DataType::Struct(ref fields) => Schema::new(fields.clone()), + _ => unreachable!("Struct array reader's data type is not struct!"), + }; + + Self { + batch_size, + array_reader, + predicate_readers: filter_readers, + schema: Arc::new(schema), + selection, + row_filter, + } + } + + pub(crate) fn take_filter(&mut self) -> Option { + self.row_filter.take() + } + + fn create_bitmap_from_ranges(&mut self, runs: &[RowSelector]) -> BooleanArray { + let mut bitmap_builder = BooleanBufferBuilder::new(runs.iter().map(|r| r.row_count).sum()); + for run in runs.iter() { + bitmap_builder.append_n(run.row_count, !run.skip); + } + BooleanArray::new(bitmap_builder.finish(), None) + } + + /// Take a selection, and return the new selection where the rows are filtered by the predicate. + fn build_predicate_filter( + &mut self, + mut selection: RowSelection, + ) -> Result { + match &mut self.row_filter { + None => Ok(selection), + Some(filter) => { + debug_assert_eq!( + self.predicate_readers.len(), + filter.predicates.len(), + "predicate readers and predicates should have the same length" + ); + + for (predicate, reader) in filter + .predicates + .iter_mut() + .zip(self.predicate_readers.iter_mut()) + { + let array = read_selection(reader.as_mut(), &selection)?; + let batch = RecordBatch::from(array.as_struct_opt().ok_or_else(|| { + general_err!("Struct array reader should return struct array") + })?); + let input_rows = batch.num_rows(); + let predicate_filter = predicate.evaluate(batch)?; + if predicate_filter.len() != input_rows { + return Err(ArrowError::ParquetError(format!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + predicate_filter.len() + ))); + } + let predicate_filter = match predicate_filter.null_count() { + 0 => predicate_filter, + _ => prep_null_mask_filter(&predicate_filter), + }; + + let raw = RowSelection::from_filters(&[predicate_filter]); + selection = selection.and_then(&raw); + } + Ok(selection) + } + } + } +} + +impl Iterator for FilteredParquetRecordBatchReader { + type Item = Result; + + fn next(&mut self) -> Option { + // With filter pushdown, it's very hard to predict the number of rows to return -- depends on the selectivity of the filter. + // We can do one of the following: + // 1. Add a coalescing step to coalesce the resulting batches. + // 2. Ask parquet reader to collect more rows before returning. + + // Approach 1 has the drawback of extra overhead of coalesce batch, which can be painful to be efficient. + // Code below implements approach 2, where we keep consuming the selection until we select at least 3/4 of the batch size. + // It boils down to leveraging array_reader's ability to collect large batches natively, + // rather than concatenating multiple small batches. + + let mut rows_accum = 0; + let mut mask_builder = BooleanBufferBuilder::new(self.batch_size); + // Move acc_skip here so it persists across loop iterations + let mut acc_skip = 0; + + while let Some(raw_sel) = + take_next_selection(&mut self.selection, self.batch_size - rows_accum) + { + let sel = match self.build_predicate_filter(raw_sel) { + Ok(s) => s, + Err(e) => return Some(Err(e)), + }; + + match sel { + RowSelection::Ranges(runs) => { + // First, compute total skip/read counts + let mut total_skip = 0; + let mut total_read = 0; + for r in &runs { + if r.skip { + total_skip += r.row_count; + } else { + total_read += r.row_count; + } + } + + // If nothing to read, accumulate skip and continue + if total_read == 0 { + acc_skip += total_skip; + continue; + } + + // Before any read, flush accumulated skips + if acc_skip > 0 { + match self.array_reader.skip_records(acc_skip) { + Ok(_) => {} + Err(e) => return Some(Err(e.into())), + }; + acc_skip = 0; + } + + let select_count = runs.len(); + let total = total_skip + total_read; + if total < 10 * select_count { + // Bitmap branch + let bitmap = self.create_bitmap_from_ranges(&runs); + match self.array_reader.read_records(bitmap.len()) { + Ok(_) => {} + Err(e) => return Some(Err(e.into())), + }; + mask_builder.append_buffer(bitmap.values()); + rows_accum += bitmap.true_count(); + } else { + // Range branch with internal skip coalescing + for r in &runs { + if r.skip { + acc_skip += r.row_count; + } else { + if acc_skip > 0 { + match self.array_reader.skip_records(acc_skip) { + Ok(_) => {} + Err(e) => return Some(Err(e.into())), + }; + acc_skip = 0; + } + match self.array_reader.read_records(r.row_count) { + Ok(_) => {} + Err(e) => return Some(Err(e.into())), + }; + mask_builder.append_n(r.row_count, true); + rows_accum += r.row_count; + } + } + } + } + + RowSelection::BitMap(bitmap) => { + // Flush any pending skips before bitmap + if acc_skip > 0 { + match self.array_reader.skip_records(acc_skip) { + Ok(_) => {} + Err(e) => return Some(Err(e.into())), + }; + acc_skip = 0; + } + let n = bitmap.len(); + match self.array_reader.read_records(n) { + Ok(_) => {} + Err(e) => return Some(Err(e.into())), + }; + mask_builder.append_buffer(bitmap.values()); + rows_accum += bitmap.true_count(); + } + } + + if rows_accum >= (self.batch_size * 3 / 4) { + break; + } + } + + // At loop exit, flush any remaining skips before finishing batch + if acc_skip > 0 { + match self.array_reader.skip_records(acc_skip) { + Ok(_) => {} + Err(e) => return Some(Err(e.into())), + }; + } + + if rows_accum == 0 { + return None; + } + + let array = match self.array_reader.consume_batch() { + Ok(a) => a, + Err(error) => return Some(Err(error.into())), + }; + + let final_array = if mask_builder.is_empty() { + array + } else { + let bitmap = BooleanArray::new(mask_builder.finish(), None); + filter(&array, &bitmap) + .map_err(|e| ParquetError::General(e.to_string())) + .ok()? + }; + + let struct_arr = final_array.as_struct_opt().expect("StructArray expected"); + Some(Ok(RecordBatch::from(struct_arr.clone()))) + } +} + +impl RecordBatchReader for FilteredParquetRecordBatchReader { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +struct CachedPage { + dict: Option<(usize, Page)>, // page offset -> page + data: VecDeque<(usize, Page)>, // page offset -> page, use 2 pages, because the batch size will exceed the page size sometimes +} + +struct PredicatePageCacheInner { + pages: HashMap, // col_id (Parquet's leaf column index) -> CachedPage +} + +impl PredicatePageCacheInner { + pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option { + self.pages.get(&col_id).and_then(|pages| { + if let Some((off, page)) = &pages.dict { + if *off == offset { + return Some(page.clone()); + } + } + + pages + .data + .iter() + .find(|(off, _)| *off == offset) + .map(|(_, page)| page.clone()) + }) + } + + /// Insert a page into the cache. + /// Inserting a page will override the existing page, if any. + /// This is because we only need to cache 4 pages per column, see below. + /// Note: 1 page is dic page, another 3 are data pages. + /// Why do we need 3 data pages? + /// Because of the performance testing result, the batch size will across multi pages. It causes original page reader + /// cache page reader doesn't always hit the cache page. So here we keep 3 pages, and from the testing result it + /// shows that 3 pages are enough to cover the batch size when we're setting batch size to 8192. And the 3 data page size + /// is not too large, it only uses 3MB in memory, so we can keep 3 pages in the cache. + /// TODO, in future we may use adaptive cache size according the dynamic batch size. + pub(crate) fn insert_page(&mut self, col_id: usize, offset: usize, page: Page) { + let is_dict = page.page_type() == PageType::DICTIONARY_PAGE; + + match self.pages.entry(col_id) { + Entry::Occupied(mut occ) => { + let cp: &mut CachedPage = occ.get_mut(); + if is_dict { + cp.dict = Some((offset, page)); + } else { + cp.data.push_back((offset, page)); + // Keep only 3 data pages in the cache + if cp.data.len() > 3 { + cp.data.pop_front(); + } + } + } + + Entry::Vacant(vac) => { + let mut data = VecDeque::new(); + let dict = if is_dict { + Some((offset, page)) + } else { + data.push_back((offset, page.clone())); + None + }; + let cp = CachedPage { dict, data }; + vac.insert(cp); + } + } + } +} + +/// A simple cache to avoid double-decompressing pages with filter pushdown. +/// In filter pushdown, we first decompress a page, apply the filter, and then decompress the page again. +/// This double decompression is expensive, so we cache the decompressed page. +/// +/// This implementation contains subtle dynamics that can be hard to understand. +/// +/// ## Which columns to cache +/// +/// Let's consider this example: SELECT B, C FROM table WHERE A = 42 and B = 37; +/// We have 3 columns, and the predicate is applied to column A and B, and projection is on B and C. +/// +/// For column A, we need to decompress it, apply the filter (A=42), and never have to decompress it again, as it's not in the projection. +/// For column B, we need to decompress it, apply the filter (B=37), and then decompress it again, as it's in the projection. +/// For column C, we don't have predicate, so we only decompress it once. +/// +/// A, C is only decompressed once, and B is decompressed twice (as it appears in both the predicate and the projection). +/// The PredicatePageCache will only cache B. +/// We use B's col_id (Parquet's leaf column index) to identify the cache entry. +/// +/// ## How many pages to cache +/// +/// Now we identified the columns to cache, next question is to determine the **minimal** number of pages to cache. +/// +/// Let's revisit our decoding pipeline: +/// Load batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1 +/// Load batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2 +/// ... +/// Load batch N -> evaluate predicates -> filter N -> load & emit batch N +/// +/// Assumption & observation: each page consists multiple batches. +/// Then our pipeline looks like this: +/// Load Page 1 +/// Load batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1 +/// Load batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2 +/// Load batch 3 -> evaluate predicates -> filter 3 -> load & emit batch 3 +/// Load Page 2 +/// Load batch 4 -> evaluate predicates -> filter 4 -> load & emit batch 4 +/// Load batch 5 -> evaluate predicates -> filter 5 -> load & emit batch 5 +/// ... +/// +/// This means that we only need to cache one page per column, +/// because the page that is used by the predicate is the same page, and is immediately used in loading the batch. +/// +/// The only exception is the dictionary page -- the first page of each column. +/// If we encountered a dict page, we will need to immediately read next page, and cache it. +/// +/// To summarize, the cache only contains 2 pages per column: one dict page and one data page. +/// This is a nice property as it means the caching memory consumption is negligible and constant to the number of columns. +/// +/// ## How to identify a page +/// We use the page offset (the offset to the Parquet file) to uniquely identify a page. +pub(crate) struct PredicatePageCache { + inner: Mutex, +} + +impl PredicatePageCache { + pub(crate) fn new(capacity: usize) -> Self { + Self { + inner: Mutex::new(PredicatePageCacheInner { + pages: HashMap::with_capacity(capacity), + }), + } + } + + fn get(&self) -> MutexGuard { + self.inner.lock().unwrap() + } +} + +pub(crate) struct CachedPageReader { + inner: SerializedPageReader, + cache: Arc, + col_id: usize, +} + +impl CachedPageReader { + pub(crate) fn new( + inner: SerializedPageReader, + cache: Arc, + col_id: usize, + ) -> Self { + Self { + inner, + cache, + col_id, + } + } +} + +impl Iterator for CachedPageReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +impl PageReader for CachedPageReader { + fn get_next_page(&mut self) -> Result, ParquetError> { + let next_page_offset = self.inner.peek_next_page_offset()?; + + let Some(offset) = next_page_offset else { + return Ok(None); + }; + + let mut cache = self.cache.get(); + + let page = cache.get_page(self.col_id, offset); + if let Some(page) = page { + self.inner.skip_next_page()?; + Ok(Some(page)) + } else { + let inner_page = self.inner.get_next_page()?; + let Some(inner_page) = inner_page else { + return Ok(None); + }; + cache.insert_page(self.col_id, offset, inner_page.clone()); + Ok(Some(inner_page)) + } + } + + fn peek_next_page(&mut self) -> Result, ParquetError> { + self.inner.peek_next_page() + } + + fn skip_next_page(&mut self) -> Result<(), ParquetError> { + self.inner.skip_next_page() + } + + fn at_record_boundary(&mut self) -> Result { + self.inner.at_record_boundary() + } +} + +// Helper implementation for testing +#[cfg(test)] +impl Page { + fn dummy_page(page_type: PageType, size: usize) -> Self { + use crate::basic::Encoding; + match page_type { + PageType::DATA_PAGE => Page::DataPage { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + def_level_encoding: Encoding::PLAIN, + rep_level_encoding: Encoding::PLAIN, + statistics: None, + }, + PageType::DICTIONARY_PAGE => Page::DictionaryPage { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + is_sorted: false, + }, + PageType::DATA_PAGE_V2 => Page::DataPageV2 { + buf: vec![0; size].into(), + num_values: size as u32, + encoding: Encoding::PLAIN, + def_levels_byte_len: 0, + rep_levels_byte_len: 0, + is_compressed: false, + statistics: None, + num_nulls: 0, + num_rows: 0, + }, + _ => unreachable!(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_take_next_selection_exact_match() { + let selection_vec = vec![ + RowSelector::skip(5), + RowSelector::select(3), + RowSelector::skip(2), + RowSelector::select(7), + ]; + let mut selection = Some(RowSelection::Ranges(selection_vec)); + + // Request exactly 10 rows (5 skip + 3 select + 2 skip) + let result = take_next_selection(&mut selection, 3).unwrap(); + assert_eq!( + result, + vec![ + RowSelector::skip(5), + RowSelector::select(3), + RowSelector::skip(2) + ] + .into() + ); + + // Remaining: select(7) + assert_eq!( + selection, + Some(RowSelection::Ranges(vec![RowSelector::select(7)])) + ); + } + + #[test] + fn test_take_next_selection_split_required() { + let mut selection = Some(RowSelection::Ranges(vec![ + RowSelector::select(10), + RowSelector::select(10), + ])); + + // Request 15 rows: should take 10 + 5, leave 5 + let result = take_next_selection(&mut selection, 15).unwrap(); + assert_eq!( + result, + RowSelection::Ranges(vec![RowSelector::select(10), RowSelector::select(5),]) + ); + + // Remaining: select(5) + assert_eq!( + selection, + Some(RowSelection::Ranges(vec![RowSelector::select(5)])) + ); + } + + #[test] + fn test_take_next_selection_empty_queue() { + let mut selection = None; + + // Empty selection + assert!(take_next_selection(&mut selection, 10).is_none()); + + // One item, smaller than to_select + selection = Some(RowSelection::Ranges(vec![RowSelector::select(5)])); + let result = take_next_selection(&mut selection, 10).unwrap(); + assert_eq!(result, RowSelection::Ranges(vec![RowSelector::select(5)])); + assert_eq!(selection, None); + } + + #[test] + fn test_predicate_page_cache_basic_operations() { + use super::*; + + // Mock PredicatePageCache with a capacity of 4 + // One dictionary page and 3 data pages + let cache = PredicatePageCache::new(4); + let page1 = Page::dummy_page(PageType::DATA_PAGE, 100); + let page2 = Page::dummy_page(PageType::DICTIONARY_PAGE, 200); + let page3 = Page::dummy_page(PageType::DATA_PAGE, 100); + let page4 = Page::dummy_page(PageType::DATA_PAGE, 100); + + // Insert and retrieve a data page + cache.get().insert_page(0, 1000, page1.clone()); + let retrieved = cache.get().get_page(0, 1000); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().page_type(), PageType::DATA_PAGE); + + // Insert and retrieve a dictionary page for same column + cache.get().insert_page(0, 2000, page2.clone()); + let retrieved = cache.get().get_page(0, 2000); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().page_type(), PageType::DICTIONARY_PAGE); + + // Insert and retrieve a data page for same column + cache.get().insert_page(0, 3000, page3.clone()); + let retrieved = cache.get().get_page(0, 3000); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().page_type(), PageType::DATA_PAGE); + + // Insert and retrieve another data page for same column + cache.get().insert_page(0, 4000, page4.clone()); + let retrieved = cache.get().get_page(0, 4000); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().page_type(), PageType::DATA_PAGE); + + // Both pages should still be accessible + assert!(cache.get().get_page(0, 1000).is_some()); + assert!(cache.get().get_page(0, 2000).is_some()); + assert!(cache.get().get_page(0, 3000).is_some()); + assert!(cache.get().get_page(0, 4000).is_some()); + } + + #[test] + fn test_predicate_page_cache_replacement() { + use super::*; + + let cache = PredicatePageCache::new(4); + let data_page1 = Page::dummy_page(PageType::DATA_PAGE, 100); + let data_page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); + let data_page3 = Page::dummy_page(PageType::DATA_PAGE, 300); + let data_page4 = Page::dummy_page(PageType::DATA_PAGE, 300); + + // Insert first data page + cache.get().insert_page(0, 1000, data_page1.clone()); + assert!(cache.get().get_page(0, 1000).is_some()); + cache.get().insert_page(0, 2000, data_page2.clone()); + assert!(cache.get().get_page(0, 2000).is_some()); + cache.get().insert_page(0, 3000, data_page3.clone()); + assert!(cache.get().get_page(0, 3000).is_some()); + + // Insert the 4th data page - should replace the first data page + cache.get().insert_page(0, 4000, data_page4.clone()); + assert!(cache.get().get_page(0, 4000).is_some()); + assert!(cache.get().get_page(0, 1000).is_none()); // First page should be gone + } + + #[test] + fn test_predicate_page_cache_multiple_columns() { + use super::*; + + let cache = PredicatePageCache::new(2); + let page1 = Page::dummy_page(PageType::DATA_PAGE, 100); + let page2 = Page::dummy_page(PageType::DATA_PAGE_V2, 200); + + // Insert pages for different columns + cache.get().insert_page(0, 1000, page1.clone()); + cache.get().insert_page(1, 1000, page2.clone()); + + // Both pages should be accessible + assert!(cache.get().get_page(0, 1000).is_some()); + assert!(cache.get().get_page(1, 1000).is_some()); + + // Non-existent column should return None + assert!(cache.get().get_page(2, 1000).is_none()); + } +} diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 45df68821ca..a0c8de6bf64 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -29,6 +29,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use arrow_reader::{CachedPageReader, FilteredParquetRecordBatchReader, PredicatePageCache}; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; @@ -40,8 +41,7 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ - apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, - ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, + ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; @@ -55,6 +55,7 @@ use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; +mod arrow_reader; mod metadata; pub use metadata::*; @@ -65,6 +66,8 @@ use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; +use super::arrow_reader::RowSelector; + /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files /// /// Notes: @@ -535,10 +538,10 @@ impl ParquetRecordBatchStreamBuilder { } } -type ReadResult = Result<(ReaderFactory, Option)>; +type ReadResult = Result<(ReaderFactory, Option)>; /// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create -/// [`ParquetRecordBatchReader`] +/// [`FilteredParquetRecordBatchReader`] struct ReaderFactory { metadata: Arc, @@ -563,69 +566,79 @@ where async fn read_row_group( mut self, row_group_idx: usize, - mut selection: Option, + selection: Option, projection: ProjectionMask, batch_size: usize, ) -> ReadResult { - // TODO: calling build_array multiple times is wasteful - - let meta = self.metadata.row_group(row_group_idx); let offset_index = self .metadata .offset_index() // filter out empty offset indexes (old versions specified Some(vec![]) when no present) - .filter(|index| !index.is_empty()) + .filter(|index| index.first().map(|v| !v.is_empty()).unwrap_or(false)) .map(|x| x[row_group_idx].as_slice()); - let mut row_group = InMemoryRowGroup { - // schema: meta.schema_descr_ptr(), - row_count: meta.num_rows() as usize, - column_chunks: vec![None; meta.columns().len()], - offset_index, + let mut predicate_projection: Option = None; + + if let Some(filter) = self.filter.as_mut() { + for predicate in filter.predicates.iter_mut() { + let p_projection = predicate.projection(); + if let Some(ref mut p) = predicate_projection { + p.union(p_projection); + } else { + predicate_projection = Some(p_projection.clone()); + } + } + } + let projection_to_cache = predicate_projection.map(|mut p| { + p.intersect(&projection); + p + }); + + let mut row_group = InMemoryRowGroup::new( row_group_idx, - metadata: self.metadata.as_ref(), - }; + self.metadata.as_ref(), + offset_index, + projection_to_cache, + ); + let mut selection = + selection.unwrap_or_else(|| vec![RowSelector::select(row_group.row_count)].into()); + + let mut filter_readers = Vec::new(); if let Some(filter) = self.filter.as_mut() { for predicate in filter.predicates.iter_mut() { - if !selects_any(selection.as_ref()) { + if !selection.selects_any() { return Ok((self, None)); } - let predicate_projection = predicate.projection(); + let p_projection = predicate.projection(); row_group - .fetch(&mut self.input, predicate_projection, selection.as_ref()) + .fetch(&mut self.input, p_projection, Some(&selection)) .await?; let array_reader = - build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; - - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); + build_array_reader(self.fields.as_deref(), p_projection, &row_group)?; + filter_readers.push(array_reader); } } // Compute the number of rows in the selection before applying limit and offset - let rows_before = selection - .as_ref() - .map(|s| s.row_count()) - .unwrap_or(row_group.row_count); + let rows_before = selection.row_count(); if rows_before == 0 { return Ok((self, None)); } - selection = apply_range(selection, row_group.row_count, self.offset, self.limit); + if let Some(offset) = self.offset { + selection = selection.offset(offset); + } + + if let Some(limit) = self.limit { + selection = selection.limit(limit); + } // Compute the number of rows in the selection after applying limit and offset - let rows_after = selection - .as_ref() - .map(|s| s.row_count()) - .unwrap_or(row_group.row_count); + let rows_after = selection.row_count(); // Update offset if necessary if let Some(offset) = &mut self.offset { @@ -643,13 +656,16 @@ where } row_group - .fetch(&mut self.input, &projection, selection.as_ref()) + .fetch(&mut self.input, &projection, Some(&selection)) .await?; - let reader = ParquetRecordBatchReader::new( + let array_reader = build_array_reader(self.fields.as_deref(), &projection, &row_group)?; + let reader = FilteredParquetRecordBatchReader::new( batch_size, - build_array_reader(self.fields.as_deref(), &projection, &row_group)?, - selection, + array_reader, + Some(selection), + filter_readers, + self.filter.take(), ); Ok((self, Some(reader))) @@ -660,7 +676,7 @@ enum StreamState { /// At the start of a new row group, or the end of the parquet stream Init, /// Decoding a batch - Decoding(ParquetRecordBatchReader), + Decoding(FilteredParquetRecordBatchReader), /// Reading data from input Reading(BoxFuture<'static, ReadResult>), /// Error @@ -753,7 +769,7 @@ where /// - `Ok(None)` if the stream has ended. /// - `Err(error)` if the stream has errored. All subsequent calls will return `Ok(None)`. /// - `Ok(Some(reader))` which holds all the data for the row group. - pub async fn next_row_group(&mut self) -> Result> { + pub async fn next_row_group(&mut self) -> Result> { loop { match &mut self.state { StreamState::Decoding(_) | StreamState::Reading(_) => { @@ -816,7 +832,14 @@ where self.state = StreamState::Error; return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string())))); } - None => self.state = StreamState::Init, + None => { + // this is ugly, but works for now. + let filter = batch_reader.take_filter(); + let reader_factory = + self.reader_factory.as_mut().expect("have reader factory"); + reader_factory.filter = filter; + self.state = StreamState::Init + } }, StreamState::Init => { let row_group_idx = match self.row_groups.pop_front() { @@ -869,8 +892,39 @@ struct InMemoryRowGroup<'a> { row_count: usize, row_group_idx: usize, metadata: &'a ParquetMetaData, + cache: Arc, + projection_to_cache: Option, } +impl<'a> InMemoryRowGroup<'a> { + fn new( + row_group_idx: usize, + metadata: &'a ParquetMetaData, + offset_index: Option<&'a [OffsetIndexMetaData]>, + projection_to_cache: Option, + ) -> Self { + let rg_metadata = metadata.row_group(row_group_idx); + let to_cache_column_cnt = projection_to_cache + .as_ref() + .map(|p| { + if let Some(mask) = &p.mask { + mask.iter().filter(|&&b| b).count() + } else { + rg_metadata.columns().len() + } + }) + .unwrap_or(0); + Self { + offset_index, + column_chunks: vec![None; rg_metadata.columns().len()], + row_count: rg_metadata.num_rows() as usize, + metadata, + row_group_idx, + cache: Arc::new(PredicatePageCache::new(to_cache_column_cnt)), + projection_to_cache, + } + } +} impl InMemoryRowGroup<'_> { /// Fetches the necessary column data into memory async fn fetch( @@ -967,6 +1021,25 @@ impl InMemoryRowGroup<'_> { Ok(()) } + + /// Returns a CachedPageReader if the specified column is included in the projection to cache + fn maybe_cached_reader( + &self, + column_idx: usize, + page_reader: SerializedPageReader, + ) -> Box { + let Some(projection_to_cache) = &self.projection_to_cache else { + return Box::new(page_reader); + }; + if !projection_to_cache.leaf_included(column_idx) { + return Box::new(page_reader); + } + Box::new(CachedPageReader::new( + page_reader, + self.cache.clone(), + column_idx, + )) + } } impl RowGroups for InMemoryRowGroup<'_> { @@ -1000,7 +1073,7 @@ impl RowGroups for InMemoryRowGroup<'_> { column_chunk_metadata, )?; - let page_reader: Box = Box::new(page_reader); + let page_reader = self.maybe_cached_reader(i, page_reader); Ok(Box::new(ColumnChunkIterator { reader: Some(Ok(page_reader)), diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index ea99530d672..94b00335830 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -689,8 +689,8 @@ impl SerializedPageReader { /// /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice. /// This function allows us to check if the next page is being cached or read previously. - #[cfg(test)] - fn peek_next_page_offset(&mut self) -> Result> { + #[cfg(feature = "async")] + pub(crate) fn peek_next_page_offset(&mut self) -> Result> { match &mut self.state { SerializedPageReaderState::Values { offset, @@ -998,6 +998,7 @@ impl PageReader for SerializedPageReader { } SerializedPageReaderState::Pages { page_locations, + dictionary_page, .. } => { @@ -1039,7 +1040,6 @@ fn page_crypto_context( #[cfg(test)] mod tests { - use std::collections::HashSet; use bytes::Buf; @@ -1538,6 +1538,7 @@ mod tests { assert_eq!(page_count, 1); } + #[cfg(feature = "async")] fn get_serialized_page_reader( file_reader: &SerializedFileReader, row_group: usize, @@ -1574,12 +1575,13 @@ mod tests { ) } + #[cfg(feature = "async")] #[test] fn test_peek_next_page_offset_matches_actual() -> Result<()> { let test_file = get_test_file("alltypes_plain.parquet"); let reader = SerializedFileReader::new(test_file)?; - let mut offset_set = HashSet::new(); + let mut offset_set = std::collections::HashSet::new(); let num_row_groups = reader.metadata.num_row_groups(); for row_group in 0..num_row_groups { let num_columns = reader.metadata.row_group(row_group).num_columns();