Skip to content

Commit 4b0cfc1

Browse files
committed
chore: use builder API to create FileScanConfig
Signed-off-by: Andrew Lamb <[email protected]>
1 parent ddad194 commit 4b0cfc1

File tree

2 files changed

+34
-56
lines changed

2 files changed

+34
-56
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
5151
use datafusion_common::scalar::ScalarValue;
5252
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
5353
use datafusion_common::{
54-
config::ConfigOptions, Column, Constraints, DFSchema, DataFusionError,
55-
Result as DataFusionResult, TableReference, ToDFSchema,
54+
config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
55+
TableReference, ToDFSchema,
5656
};
5757
use datafusion_expr::execution_props::ExecutionProps;
5858
use datafusion_expr::logical_plan::CreateExternalTable;
@@ -648,25 +648,24 @@ impl<'a> DeltaScanBuilder<'a> {
648648
..Default::default()
649649
};
650650

651-
let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig {
652-
object_store_url: self.log_store.object_store_url(),
653-
file_schema,
654-
// If all files were filtered out, we still need to emit at least one partition to
655-
// pass datafusion sanity checks.
656-
//
657-
// See https://github.com/apache/datafusion/issues/11322
658-
file_groups: if file_groups.is_empty() {
659-
vec![vec![]]
660-
} else {
661-
file_groups.into_values().collect()
662-
},
663-
constraints: Constraints::default(),
664-
statistics: stats,
665-
projection: self.projection.cloned(),
666-
limit: self.limit,
667-
table_partition_cols,
668-
output_ordering: vec![],
669-
})
651+
let mut exec_plan_builder = ParquetExecBuilder::new(
652+
FileScanConfig::new(self.log_store.object_store_url(), file_schema)
653+
.with_file_groups(
654+
// If all files were filtered out, we still need to emit at least one partition to
655+
// pass datafusion sanity checks.
656+
//
657+
// See https://github.com/apache/datafusion/issues/11322
658+
if file_groups.is_empty() {
659+
vec![vec![]]
660+
} else {
661+
file_groups.into_values().collect()
662+
},
663+
)
664+
.with_statistics(stats)
665+
.with_projection(self.projection.cloned())
666+
.with_limit(self.limit)
667+
.with_table_partition_cols(table_partition_cols),
668+
)
670669
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))
671670
.with_table_parquet_options(parquet_options);
672671

crates/core/src/operations/load_cdf.rs

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use datafusion::datasource::file_format::FileFormat;
2121
use datafusion::datasource::physical_plan::FileScanConfig;
2222
use datafusion::execution::SessionState;
2323
use datafusion::prelude::SessionContext;
24-
use datafusion_common::{Constraints, ScalarValue, Statistics};
24+
use datafusion_common::ScalarValue;
2525
use datafusion_physical_expr::{expressions, PhysicalExpr};
2626
use datafusion_physical_plan::projection::ProjectionExec;
2727
use datafusion_physical_plan::union::UnionExec;
@@ -378,53 +378,32 @@ impl CdfLoadBuilder {
378378
let cdc_scan = ParquetFormat::new()
379379
.create_physical_plan(
380380
session_sate,
381-
FileScanConfig {
382-
object_store_url: self.log_store.object_store_url(),
383-
file_schema: cdc_file_schema.clone(),
384-
file_groups: cdc_file_groups.into_values().collect(),
385-
constraints: Constraints::default(),
386-
statistics: Statistics::new_unknown(&cdc_file_schema),
387-
projection: None,
388-
limit: None,
389-
table_partition_cols: cdc_partition_cols,
390-
output_ordering: vec![],
391-
},
381+
FileScanConfig::new(self.log_store.object_store_url(), cdc_file_schema)
382+
.with_file_groups(cdc_file_groups.into_values().collect())
383+
.with_table_partition_cols(cdc_partition_cols),
392384
filters,
393385
)
394386
.await?;
395387

396388
let add_scan = ParquetFormat::new()
397389
.create_physical_plan(
398390
session_sate,
399-
FileScanConfig {
400-
object_store_url: self.log_store.object_store_url(),
401-
file_schema: add_remove_file_schema.clone(),
402-
file_groups: add_file_groups.into_values().collect(),
403-
constraints: Constraints::default(),
404-
statistics: Statistics::new_unknown(&add_remove_file_schema.clone()),
405-
projection: None,
406-
limit: None,
407-
table_partition_cols: add_remove_partition_cols.clone(),
408-
output_ordering: vec![],
409-
},
391+
FileScanConfig::new(
392+
self.log_store.object_store_url(),
393+
add_remove_file_schema.clone(),
394+
)
395+
.with_file_groups(add_file_groups.into_values().collect())
396+
.with_table_partition_cols(add_remove_partition_cols.clone()),
410397
filters,
411398
)
412399
.await?;
413400

414401
let remove_scan = ParquetFormat::new()
415402
.create_physical_plan(
416403
session_sate,
417-
FileScanConfig {
418-
object_store_url: self.log_store.object_store_url(),
419-
file_schema: add_remove_file_schema.clone(),
420-
file_groups: remove_file_groups.into_values().collect(),
421-
constraints: Constraints::default(),
422-
statistics: Statistics::new_unknown(&add_remove_file_schema),
423-
projection: None,
424-
limit: None,
425-
table_partition_cols: add_remove_partition_cols,
426-
output_ordering: vec![],
427-
},
404+
FileScanConfig::new(self.log_store.object_store_url(), add_remove_file_schema)
405+
.with_file_groups(remove_file_groups.into_values().collect())
406+
.with_table_partition_cols(add_remove_partition_cols),
428407
filters,
429408
)
430409
.await?;
@@ -435,7 +414,7 @@ impl CdfLoadBuilder {
435414
Arc::new(UnionExec::new(vec![cdc_scan, add_scan, remove_scan]));
436415

437416
// We project the union in the order of the input_schema + cdc cols at the end
438-
// This is to ensure the DeltaCdfTableProvider uses the correct schema consturction.
417+
// This is to ensure the DeltaCdfTableProvider uses the correct schema construction.
439418
let mut fields = schema.fields().to_vec();
440419
for f in ADD_PARTITION_SCHEMA.clone() {
441420
fields.push(f.into());

0 commit comments

Comments
 (0)