Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ impl PreparedAccessPlan {

impl FileOpener for ParquetOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
// -----------------------------------
// Step: prepare configurations, etc.
// -----------------------------------
let file_range = partitioned_file.range.clone();
let extensions = partitioned_file.extensions.clone();
let file_location = partitioned_file.object_meta.location.clone();
Expand Down Expand Up @@ -280,6 +283,10 @@ impl FileOpener for ParquetOpener {
.get_file_decryption_properties(&file_location)
.await?;

// ---------------------------------------------
// Step: try to prune the current file partition
// ---------------------------------------------

// Prune this file using the file level statistics and partition values.
// Since dynamic filters may have been updated since planning it is possible that we are able
// to prune files now that we couldn't prune at planning time.
Expand Down Expand Up @@ -328,6 +335,10 @@ impl FileOpener for ParquetOpener {

file_metrics.files_ranges_pruned_statistics.add_matched(1);

// --------------------------------------------------------
// Step: fetch Parquet metadata (and optionally page index)
// --------------------------------------------------------

// Don't load the page index yet. Since it is not stored inline in
// the footer, loading the page index if it is not needed will do
// unnecessary I/O. We decide later if it is needed to evaluate the
Expand Down Expand Up @@ -428,14 +439,21 @@ impl FileOpener for ParquetOpener {

metadata_timer.stop();

// ---------------------------------------------------------
// Step: construct builder for the final RecordBatch stream
// ---------------------------------------------------------

let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
async_file_reader,
reader_metadata,
);

let indices = projection.column_indices();

let mask = ProjectionMask::roots(builder.parquet_schema(), indices);
// ---------------------------------------------------------------------
// Step: optionally add row filter to the builder
//
// Row filter is used for late materialization in parquet decoding, see
// `row_filter` for details.
// ---------------------------------------------------------------------

// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
Expand Down Expand Up @@ -464,6 +482,10 @@ impl FileOpener for ParquetOpener {
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
}

// ------------------------------------------------------------
// Step: prune row groups by range, predicate and bloom filter
// ------------------------------------------------------------

// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
let file_metadata = Arc::clone(builder.metadata());
Expand Down Expand Up @@ -525,9 +547,13 @@ impl FileOpener for ParquetOpener {

let mut access_plan = row_groups.build();

// --------------------------------------------------------
// Step: prune pages from the kept row groups
//
// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
// --------------------------------------------------------
if enable_page_index
&& !access_plan.is_empty()
&& let Some(p) = page_pruning_predicate
Expand All @@ -545,7 +571,10 @@ impl FileOpener for ParquetOpener {
let mut prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)?;

// If reverse scanning is enabled, reverse the prepared plan
// ----------------------------------------------------------
// Step: potentially reverse the access plan for performance.
// See `ParquetSource::try_reverse_output` for the rationale.
// ----------------------------------------------------------
if reverse_row_groups {
prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
}
Expand All @@ -564,6 +593,9 @@ impl FileOpener for ParquetOpener {
// metrics from the arrow reader itself
let arrow_reader_metrics = ArrowReaderMetrics::enabled();

let indices = projection.column_indices();
let mask = ProjectionMask::roots(builder.parquet_schema(), indices);

let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
Expand Down Expand Up @@ -621,6 +653,9 @@ impl FileOpener for ParquetOpener {
})
});

// ----------------------------------------------------------------------
// Step: wrap the stream so a dynamic filter can stop the file scan early
// ----------------------------------------------------------------------
if let Some(file_pruner) = file_pruner {
Ok(EarlyStoppingStream::new(
stream,
Expand Down