diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 83bdf79c8fcc0..570f9b4412840 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -180,6 +180,9 @@ impl PreparedAccessPlan { impl FileOpener for ParquetOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { + // ----------------------------------- + // Step: prepare configurations, etc. + // ----------------------------------- let file_range = partitioned_file.range.clone(); let extensions = partitioned_file.extensions.clone(); let file_location = partitioned_file.object_meta.location.clone(); @@ -280,6 +283,10 @@ impl FileOpener for ParquetOpener { .get_file_decryption_properties(&file_location) .await?; + // --------------------------------------------- + // Step: try to prune the current file partition + // --------------------------------------------- + // Prune this file using the file level statistics and partition values. // Since dynamic filters may have been updated since planning it is possible that we are able // to prune files now that we couldn't prune at planning time. @@ -328,6 +335,10 @@ impl FileOpener for ParquetOpener { file_metrics.files_ranges_pruned_statistics.add_matched(1); + // -------------------------------------------------------- + // Step: fetch Parquet metadata (and optionally page index) + // -------------------------------------------------------- + // Don't load the page index yet. Since it is not stored inline in // the footer, loading the page index if it is not needed will do // unnecessary I/O. We decide later if it is needed to evaluate the @@ -428,14 +439,21 @@ impl FileOpener for ParquetOpener { metadata_timer.stop(); + // --------------------------------------------------------- + // Step: construct builder for the final RecordBatch stream + // --------------------------------------------------------- + let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( async_file_reader, reader_metadata, ); - let indices = projection.column_indices(); - - let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + // --------------------------------------------------------------------- + // Step: optionally add row filter to the builder + // + // Row filter is used for late materialization in parquet decoding, see + // `row_filter` for details. + // --------------------------------------------------------------------- // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { @@ -464,6 +482,10 @@ impl FileOpener for ParquetOpener { builder.with_row_selection_policy(RowSelectionPolicy::Selectors); } + // ------------------------------------------------------------ + // Step: prune row groups by range, predicate and bloom filter + // ------------------------------------------------------------ + // Determine which row groups to actually read. The idea is to skip // as many row groups as possible based on the metadata and query let file_metadata = Arc::clone(builder.metadata()); @@ -525,9 +547,13 @@ impl FileOpener for ParquetOpener { let mut access_plan = row_groups.build(); + // -------------------------------------------------------- + // Step: prune pages from the kept row groups + // // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well + // -------------------------------------------------------- if enable_page_index && !access_plan.is_empty() && let Some(p) = page_pruning_predicate @@ -545,7 +571,10 @@ impl FileOpener for ParquetOpener { let mut prepared_plan = PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)?; - // If reverse scanning is enabled, reverse the prepared plan + // ---------------------------------------------------------- + // Step: potentially reverse the access plan for performance. + // See `ParquetSource::try_reverse_output` for the rationale. + // ---------------------------------------------------------- if reverse_row_groups { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } @@ -564,6 +593,9 @@ impl FileOpener for ParquetOpener { // metrics from the arrow reader itself let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + let indices = projection.column_indices(); + let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + let stream = builder .with_projection(mask) .with_batch_size(batch_size) @@ -621,6 +653,9 @@ impl FileOpener for ParquetOpener { }) }); + // ---------------------------------------------------------------------- + // Step: wrap the stream so a dynamic filter can stop the file scan early + // ---------------------------------------------------------------------- if let Some(file_pruner) = file_pruner { Ok(EarlyStoppingStream::new( stream,