Skip to content

Commit 9df957b

Browse files
alambNGA-TRAN
andauthored
API from ParquetExec to ParquetExecBuilder (#12799)
* API to go from `ParquetExec` to `ParquetExecBuilder` * fix potential regression * Apply suggestions from code review Co-authored-by: Nga Tran <[email protected]> * add note about fields being re-created --------- Co-authored-by: Nga Tran <[email protected]>
1 parent 48d395a commit 9df957b

File tree

1 file changed

+89
-3
lines changed
  • datafusion/core/src/datasource/physical_plan/parquet

1 file changed

+89
-3
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,33 @@ pub use writer::plan_to_parquet;
166166
/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter
167167
/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
168168
///
169+
/// # Example: rewriting `ParquetExec`
170+
///
171+
/// You can modify a `ParquetExec` using [`ParquetExecBuilder`], for example
172+
/// to change files or add a predicate.
173+
///
174+
/// ```no_run
175+
/// # use std::sync::Arc;
176+
/// # use arrow::datatypes::Schema;
177+
/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
178+
/// # use datafusion::datasource::listing::PartitionedFile;
179+
/// # fn parquet_exec() -> ParquetExec { unimplemented!() }
180+
/// // Split a single ParquetExec into multiple ParquetExecs, one for each file
181+
/// let exec = parquet_exec();
182+
/// let existing_file_groups = &exec.base_config().file_groups;
183+
/// let new_execs = existing_file_groups
184+
/// .iter()
185+
/// .map(|file_group| {
186+
/// // create a new exec by copying the existing exec into a builder
187+
/// let new_exec = exec.clone()
188+
/// .into_builder()
189+
/// .with_file_groups(vec![file_group.clone()])
190+
/// .build();
191+
/// new_exec
192+
/// })
193+
/// .collect::<Vec<_>>();
194+
/// ```
195+
///
169196
/// # Implementing External Indexes
170197
///
171198
/// It is possible to restrict the row groups and selections within those row
@@ -257,6 +284,12 @@ pub struct ParquetExec {
257284
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
258285
}
259286

287+
impl From<ParquetExec> for ParquetExecBuilder {
288+
fn from(exec: ParquetExec) -> Self {
289+
exec.into_builder()
290+
}
291+
}
292+
260293
/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
261294
///
262295
/// See example on [`ParquetExec`].
@@ -291,6 +324,12 @@ impl ParquetExecBuilder {
291324
}
292325
}
293326

327+
/// Update the list of files groups to read
328+
pub fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
329+
self.file_scan_config.file_groups = file_groups;
330+
self
331+
}
332+
294333
/// Set the filter predicate when reading.
295334
///
296335
/// See the "Predicate Pushdown" section of the [`ParquetExec`] documenation
@@ -459,6 +498,34 @@ impl ParquetExec {
459498
ParquetExecBuilder::new(file_scan_config)
460499
}
461500

501+
/// Convert this `ParquetExec` into a builder for modification
502+
pub fn into_builder(self) -> ParquetExecBuilder {
503+
// list out fields so it is clear what is being dropped
504+
// (note the fields which are dropped are re-created as part of calling
505+
// `build` on the builder)
506+
let Self {
507+
base_config,
508+
projected_statistics: _,
509+
metrics: _,
510+
predicate,
511+
pruning_predicate: _,
512+
page_pruning_predicate: _,
513+
metadata_size_hint,
514+
parquet_file_reader_factory,
515+
cache: _,
516+
table_parquet_options,
517+
schema_adapter_factory,
518+
} = self;
519+
ParquetExecBuilder {
520+
file_scan_config: base_config,
521+
predicate,
522+
metadata_size_hint,
523+
table_parquet_options,
524+
parquet_file_reader_factory,
525+
schema_adapter_factory,
526+
}
527+
}
528+
462529
/// [`FileScanConfig`] that controls this scan (such as which files to read)
463530
pub fn base_config(&self) -> &FileScanConfig {
464531
&self.base_config
@@ -479,9 +546,15 @@ impl ParquetExec {
479546
self.pruning_predicate.as_ref()
480547
}
481548

549+
/// return the optional file reader factory
550+
pub fn parquet_file_reader_factory(
551+
&self,
552+
) -> Option<&Arc<dyn ParquetFileReaderFactory>> {
553+
self.parquet_file_reader_factory.as_ref()
554+
}
555+
482556
/// Optional user defined parquet file reader factory.
483557
///
484-
/// See documentation on [`ParquetExecBuilder::with_parquet_file_reader_factory`]
485558
pub fn with_parquet_file_reader_factory(
486559
mut self,
487560
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -490,6 +563,11 @@ impl ParquetExec {
490563
self
491564
}
492565

566+
/// return the optional schema adapter factory
567+
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
568+
self.schema_adapter_factory.as_ref()
569+
}
570+
493571
/// Optional schema adapter factory.
494572
///
495573
/// See documentation on [`ParquetExecBuilder::with_schema_adapter_factory`]
@@ -586,7 +664,14 @@ impl ParquetExec {
586664
)
587665
}
588666

589-
fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
667+
/// Updates the file groups to read and recalculates the output partitioning
668+
///
669+
/// Note this function does not update statistics or other properties
670+
/// that depend on the file groups.
671+
fn with_file_groups_and_update_partitioning(
672+
mut self,
673+
file_groups: Vec<Vec<PartitionedFile>>,
674+
) -> Self {
590675
self.base_config.file_groups = file_groups;
591676
// Changing file groups may invalidate output partitioning. Update it also
592677
let output_partitioning = Self::output_partitioning_helper(&self.base_config);
@@ -679,7 +764,8 @@ impl ExecutionPlan for ParquetExec {
679764

680765
let mut new_plan = self.clone();
681766
if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
682-
new_plan = new_plan.with_file_groups(repartitioned_file_groups);
767+
new_plan = new_plan
768+
.with_file_groups_and_update_partitioning(repartitioned_file_groups);
683769
}
684770
Ok(Some(Arc::new(new_plan)))
685771
}

0 commit comments

Comments
 (0)