diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 90e2592183d6..ea068acb29eb 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -21,10 +21,8 @@ use arrow_array::cast::AsArray; use arrow_array::Array; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; -use arrow_select::filter::prep_null_mask_filter; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; -use std::collections::VecDeque; use std::sync::Arc; pub use crate::arrow::array_reader::RowGroups; @@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; +pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder}; + mod filter; +mod read_plan; mod selection; pub mod statistics; @@ -679,38 +680,32 @@ impl ParquetRecordBatchReaderBuilder { }; let mut filter = self.filter; - let mut selection = self.selection; + let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection); + // Update selection based on any filters if let Some(filter) = filter.as_mut() { for predicate in filter.predicates.iter_mut() { - if !selects_any(selection.as_ref()) { + // break early if we have ruled out all rows + if !plan_builder.selects_any() { break; } let array_reader = build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?; - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); + plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; } } let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?; + let read_plan = plan_builder + .limited(reader.num_rows()) + .with_offset(self.offset) + .with_limit(self.limit) + .build_limited() + .build(); - // If selection is empty, truncate - if !selects_any(selection.as_ref()) { - selection = Some(RowSelection::from(vec![])); - } - - Ok(ParquetRecordBatchReader::new( - batch_size, - array_reader, - apply_range(selection, reader.num_rows(), self.offset, self.limit), - )) + Ok(ParquetRecordBatchReader::new(array_reader, read_plan)) } } @@ -789,11 +784,9 @@ impl PageIterator for ReaderPageIterator {} /// An `Iterator>` that yields [`RecordBatch`] /// read from a parquet data source pub struct ParquetRecordBatchReader { - batch_size: usize, array_reader: Box, schema: SchemaRef, - /// Row ranges to be selected from the data source - selection: Option>, + read_plan: ReadPlan, } impl Iterator for ParquetRecordBatchReader { @@ -814,9 +807,10 @@ impl ParquetRecordBatchReader { /// simplify error handling with `?` fn next_inner(&mut self) -> Result> { let mut read_records = 0; - match self.selection.as_mut() { + let batch_size = self.batch_size(); + match self.read_plan.selection_mut() { Some(selection) => { - while read_records < self.batch_size && !selection.is_empty() { + while read_records < batch_size && !selection.is_empty() { let front = selection.pop_front().unwrap(); if front.skip { let skipped = self.array_reader.skip_records(front.row_count)?; @@ -838,7 +832,7 @@ impl ParquetRecordBatchReader { } // try to read record - let need_read = self.batch_size - read_records; + let need_read = batch_size - read_records; let to_read = match front.row_count.checked_sub(need_read) { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. @@ -855,7 +849,7 @@ impl ParquetRecordBatchReader { } } None => { - self.array_reader.read_records(self.batch_size)?; + self.array_reader.read_records(batch_size)?; } }; @@ -905,116 +899,37 @@ impl ParquetRecordBatchReader { let array_reader = build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?; + let read_plan = ReadPlanBuilder::new(batch_size) + .with_selection(selection) + .build(); + Ok(Self { - batch_size, array_reader, schema: Arc::new(Schema::new(levels.fields.clone())), - selection: selection.map(|s| s.trim().into()), + read_plan, }) } /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned - pub(crate) fn new( - batch_size: usize, - array_reader: Box, - selection: Option, - ) -> Self { + pub(crate) fn new(array_reader: Box, read_plan: ReadPlan) -> Self { let schema = match array_reader.get_data_type() { ArrowType::Struct(ref fields) => Schema::new(fields.clone()), _ => unreachable!("Struct array reader's data type is not struct!"), }; Self { - batch_size, array_reader, schema: Arc::new(schema), - selection: selection.map(|s| s.trim().into()), + read_plan, } } -} -/// Returns `true` if `selection` is `None` or selects some rows -pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool { - selection.map(|x| x.selects_any()).unwrap_or(true) -} - -/// Applies an optional offset and limit to an optional [`RowSelection`] -pub(crate) fn apply_range( - mut selection: Option, - row_count: usize, - offset: Option, - limit: Option, -) -> Option { - // If an offset is defined, apply it to the `selection` - if let Some(offset) = offset { - selection = Some(match row_count.checked_sub(offset) { - None => RowSelection::from(vec![]), - Some(remaining) => selection - .map(|selection| selection.offset(offset)) - .unwrap_or_else(|| { - RowSelection::from(vec![ - RowSelector::skip(offset), - RowSelector::select(remaining), - ]) - }), - }); - } - - // If a limit is defined, apply it to the final `selection` - if let Some(limit) = limit { - selection = Some( - selection - .map(|selection| selection.limit(limit)) - .unwrap_or_else(|| { - RowSelection::from(vec![RowSelector::select(limit.min(row_count))]) - }), - ); - } - selection -} - -/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating -/// which rows to return. -/// -/// `input_selection`: Optional pre-existing selection. If `Some`, then the -/// final [`RowSelection`] will be the conjunction of it and the rows selected -/// by `predicate`. -/// -/// Note: A pre-existing selection may come from evaluating a previous predicate -/// or if the [`ParquetRecordBatchReader`] specified an explicit -/// [`RowSelection`] in addition to one or more predicates. -pub(crate) fn evaluate_predicate( - batch_size: usize, - array_reader: Box, - input_selection: Option, - predicate: &mut dyn ArrowPredicate, -) -> Result { - let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); - let mut filters = vec![]; - for maybe_batch in reader { - let maybe_batch = maybe_batch?; - let input_rows = maybe_batch.num_rows(); - let filter = predicate.evaluate(maybe_batch)?; - // Since user supplied predicate, check error here to catch bugs quickly - if filter.len() != input_rows { - return Err(arrow_err!( - "ArrowPredicate predicate returned {} rows, expected {input_rows}", - filter.len() - )); - } - match filter.null_count() { - 0 => filters.push(filter), - _ => filters.push(prep_null_mask_filter(&filter)), - }; + #[inline(always)] + pub(crate) fn batch_size(&self) -> usize { + self.read_plan.batch_size() } - - let raw = RowSelection::from_filters(&filters); - Ok(match input_selection { - Some(selection) => selection.and_then(&raw), - None => raw, - }) } #[cfg(test)] @@ -3993,7 +3908,7 @@ mod tests { .build() .unwrap(); assert_ne!(1024, num_rows); - assert_eq!(reader.batch_size, num_rows as usize); + assert_eq!(reader.read_plan.batch_size(), num_rows as usize); } #[test] diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs new file mode 100644 index 000000000000..cf5d83385038 --- /dev/null +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read +//! from a Parquet file + +use crate::arrow::array_reader::ArrayReader; +use crate::arrow::arrow_reader::{ + ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector, +}; +use crate::errors::{ParquetError, Result}; +use arrow_array::Array; +use arrow_select::filter::prep_null_mask_filter; +use std::collections::VecDeque; + +/// A builder for [`ReadPlan`] +#[derive(Clone)] +pub(crate) struct ReadPlanBuilder { + batch_size: usize, + /// Current to apply, includes all filters + selection: Option, +} + +impl ReadPlanBuilder { + /// Create a `ReadPlanBuilder` with the given batch size + pub(crate) fn new(batch_size: usize) -> Self { + Self { + batch_size, + selection: None, + } + } + + /// Set the current selection to the given value + pub(crate) fn with_selection(mut self, selection: Option) -> Self { + self.selection = selection; + self + } + + /// Returns the current selection, if any + pub(crate) fn selection(&self) -> Option<&RowSelection> { + self.selection.as_ref() + } + + /// Specifies the number of rows in the row group, before filtering is applied. + /// + /// Returns a [`LimitedReadPlanBuilder`] that can apply + /// offset and limit. + /// + /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this + /// selection. + pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder { + LimitedReadPlanBuilder::new(self, row_count) + } + + /// Returns true if the current plan selects any rows + pub(crate) fn selects_any(&self) -> bool { + self.selection + .as_ref() + .map(|s| s.selects_any()) + .unwrap_or(true) + } + + /// Returns the number of rows selected, or `None` if all rows are selected. + pub(crate) fn num_rows_selected(&self) -> Option { + self.selection.as_ref().map(|s| s.row_count()) + } + + /// Evaluates an [`ArrowPredicate`], updating this plan's `selection` + /// + /// If the current `selection` is `Some`, the resulting [`RowSelection`] + /// will be the conjunction of the existing selection and the rows selected + /// by `predicate`. + /// + /// Note: pre-existing selections may come from evaluating a previous predicate + /// or if the [`ParquetRecordBatchReader`] specified an explicit + /// [`RowSelection`] in addition to one or more predicates. + pub(crate) fn with_predicate( + mut self, + array_reader: Box, + predicate: &mut dyn ArrowPredicate, + ) -> Result { + let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); + let mut filters = vec![]; + for maybe_batch in reader { + let maybe_batch = maybe_batch?; + let input_rows = maybe_batch.num_rows(); + let filter = predicate.evaluate(maybe_batch)?; + // Since user supplied predicate, check error here to catch bugs quickly + if filter.len() != input_rows { + return Err(arrow_err!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + filter.len() + )); + } + match filter.null_count() { + 0 => filters.push(filter), + _ => filters.push(prep_null_mask_filter(&filter)), + }; + } + + let raw = RowSelection::from_filters(&filters); + self.selection = match self.selection.take() { + Some(selection) => Some(selection.and_then(&raw)), + None => Some(raw), + }; + Ok(self) + } + + /// Create a final `ReadPlan` the read plan for the scan + pub(crate) fn build(mut self) -> ReadPlan { + // If selection is empty, truncate + if !self.selects_any() { + self.selection = Some(RowSelection::from(vec![])); + } + let Self { + batch_size, + selection, + } = self; + + let selection = selection.map(|s| s.trim().into()); + + ReadPlan { + batch_size, + selection, + } + } +} + +/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan +/// +/// See [`ReadPlanBuilder::limited`] to create this builder. +pub(crate) struct LimitedReadPlanBuilder { + /// The underlying builder + inner: ReadPlanBuilder, + /// Total number of rows in the row group before the selection, limit or + /// offset are applied + row_count: usize, + /// The offset to apply, if any + offset: Option, + /// The limit to apply, if any + limit: Option, +} + +impl LimitedReadPlanBuilder { + /// Create a new `LimitedReadPlanBuilder` from the existing builder and number of rows + fn new(inner: ReadPlanBuilder, row_count: usize) -> Self { + Self { + inner, + row_count, + offset: None, + limit: None, + } + } + + /// Set the offset to apply to the read plan + pub(crate) fn with_offset(mut self, offset: Option) -> Self { + self.offset = offset; + self + } + + /// Set the limit to apply to the read plan + pub(crate) fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + /// Apply offset and limit, updating the selection on the underlying builder + /// and returning it. + pub(crate) fn build_limited(self) -> ReadPlanBuilder { + let Self { + mut inner, + row_count, + offset, + limit, + } = self; + + // If the selection is empty, truncate + if !inner.selects_any() { + inner.selection = Some(RowSelection::from(vec![])); + } + + // If an offset is defined, apply it to the `selection` + if let Some(offset) = offset { + inner.selection = Some(match row_count.checked_sub(offset) { + None => RowSelection::from(vec![]), + Some(remaining) => inner + .selection + .map(|selection| selection.offset(offset)) + .unwrap_or_else(|| { + RowSelection::from(vec![ + RowSelector::skip(offset), + RowSelector::select(remaining), + ]) + }), + }); + } + + // If a limit is defined, apply it to the final `selection` + if let Some(limit) = limit { + inner.selection = Some( + inner + .selection + .map(|selection| selection.limit(limit)) + .unwrap_or_else(|| { + RowSelection::from(vec![RowSelector::select(limit.min(row_count))]) + }), + ); + } + + inner + } +} + +/// A plan reading specific rows from a Parquet Row Group. +/// +/// See [`ReadPlanBuilder`] to create `ReadPlan`s +pub(crate) struct ReadPlan { + /// The number of rows to read in each batch + batch_size: usize, + /// Row ranges to be selected from the data source + selection: Option>, +} + +impl ReadPlan { + /// Returns a mutable reference to the selection, if any + pub(crate) fn selection_mut(&mut self) -> Option<&mut VecDeque> { + self.selection.as_mut() + } + + /// Return the number of rows to read in each output batch + #[inline(always)] + pub fn batch_size(&self) -> usize { + self.batch_size + } +} diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9466fb9a35a0..0c38d36a5b5b 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -40,8 +40,8 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ - apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, - ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, + ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, + RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; @@ -61,6 +61,7 @@ pub use metadata::*; #[cfg(feature = "object_store")] mod store; +use crate::arrow::arrow_reader::ReadPlanBuilder; use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; @@ -535,6 +536,10 @@ impl ParquetRecordBatchStreamBuilder { } } +/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] for the next row group +/// +/// Note: If all rows are filtered out in the row group (e.g by filters, limit or +/// offset), returns `None` for the reader. type ReadResult = Result<(ReaderFactory, Option)>; /// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create @@ -542,14 +547,18 @@ type ReadResult = Result<(ReaderFactory, Option) struct ReaderFactory { metadata: Arc, + /// Top level parquet schema fields: Option>, input: T, + /// Optional filter filter: Option, + /// Limit to apply to remaining row groups. limit: Option, + /// Offset to apply to the next offset: Option, } @@ -559,11 +568,13 @@ where { /// Reads the next row group with the provided `selection`, `projection` and `batch_size` /// + /// Updates the `limit` and `offset` of the reader factory + /// /// Note: this captures self so that the resulting future has a static lifetime async fn read_row_group( mut self, row_group_idx: usize, - mut selection: Option, + selection: Option, projection: ProjectionMask, batch_size: usize, ) -> ReadResult { @@ -586,49 +597,50 @@ where metadata: self.metadata.as_ref(), }; + let filter = self.filter.as_mut(); + let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); + // Update selection based on any filters - if let Some(filter) = self.filter.as_mut() { + if let Some(filter) = filter { for predicate in filter.predicates.iter_mut() { - if !selects_any(selection.as_ref()) { - return Ok((self, None)); + if !plan_builder.selects_any() { + return Ok((self, None)); // ruled out entire row group } - let predicate_projection = predicate.projection(); + // (pre) Fetch only the columns that are selected by the predicate + let selection = plan_builder.selection(); row_group - .fetch(&mut self.input, predicate_projection, selection.as_ref()) + .fetch(&mut self.input, predicate.projection(), selection) .await?; let array_reader = - build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; - - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); + build_array_reader(self.fields.as_deref(), predicate.projection(), &row_group)?; + + plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; } } // Compute the number of rows in the selection before applying limit and offset - let rows_before = selection - .as_ref() - .map(|s| s.row_count()) + let rows_before = plan_builder + .num_rows_selected() .unwrap_or(row_group.row_count); if rows_before == 0 { - return Ok((self, None)); + return Ok((self, None)); // ruled out entire row group } - selection = apply_range(selection, row_group.row_count, self.offset, self.limit); + // Apply any limit and offset + let plan_builder = plan_builder + .limited(row_group.row_count) + .with_offset(self.offset) + .with_limit(self.limit) + .build_limited(); - // Compute the number of rows in the selection after applying limit and offset - let rows_after = selection - .as_ref() - .map(|s| s.row_count()) + let rows_after = plan_builder + .num_rows_selected() .unwrap_or(row_group.row_count); - // Update offset if necessary + // Update running offset and limit for after the current row group is read if let Some(offset) = &mut self.offset { // Reduction is either because of offset or limit, as limit is applied // after offset has been "exhausted" can just use saturating sub here @@ -636,22 +648,21 @@ where } if rows_after == 0 { - return Ok((self, None)); + return Ok((self, None)); // ruled out entire row group } if let Some(limit) = &mut self.limit { *limit -= rows_after; } - + // fetch the pages needed for decoding row_group - .fetch(&mut self.input, &projection, selection.as_ref()) + .fetch(&mut self.input, &projection, plan_builder.selection()) .await?; - let reader = ParquetRecordBatchReader::new( - batch_size, - build_array_reader(self.fields.as_deref(), &projection, &row_group)?, - selection, - ); + let plan = plan_builder.build(); + + let array_reader = build_array_reader(self.fields.as_deref(), &projection, &row_group)?; + let reader = ParquetRecordBatchReader::new(array_reader, plan); Ok((self, Some(reader))) }