diff --git a/datafusion/core/src/datasource/data_source.rs b/datafusion/core/src/datasource/data_source.rs index d31b68019e30..fcb31194eab1 100644 --- a/datafusion/core/src/datasource/data_source.rs +++ b/datafusion/core/src/datasource/data_source.rs @@ -26,6 +26,8 @@ use crate::datasource::physical_plan::{FileOpener, FileScanConfig}; use arrow::datatypes::SchemaRef; use datafusion_common::Statistics; +use datafusion_datasource::file_groups::FileGroupPartitioner; +use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -62,9 +64,33 @@ pub trait FileSource: Send + Sync { fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result { Ok(()) } - /// Return true if the file format supports repartition + + /// If supported by the [`FileSource`], redistribute files across partitions according to their size. + /// Allows custom file formats to implement their own repartitioning logic. /// - /// If this returns true, the DataSourceExec may repartition the data - /// by breaking up the input files into multiple smaller groups. - fn supports_repartition(&self, config: &FileScanConfig) -> bool; + /// Provides a default repartitioning behavior, see comments on [`FileGroupPartitioner`] for more detail. + fn repartitioned( + &self, + target_partitions: usize, + repartition_file_min_size: usize, + output_ordering: Option, + config: &FileScanConfig, + ) -> datafusion_common::Result> { + if config.file_compression_type.is_compressed() || config.new_lines_in_values { + return Ok(None); + } + + let repartitioned_file_groups_option = FileGroupPartitioner::new() + .with_target_partitions(target_partitions) + .with_repartition_file_min_size(repartition_file_min_size) + .with_preserve_order_within_groups(output_ordering.is_some()) + .repartition_file_groups(&config.file_groups); + + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + let mut source = config.clone(); + source.file_groups = repartitioned_file_groups; + return Ok(Some(source)); + } + Ok(None) + } } diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 4a7cdc192cd3..84c99b31955c 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -256,10 +256,6 @@ impl FileSource for ArrowSource { fn file_type(&self) -> &str { "arrow" } - - fn supports_repartition(&self, config: &FileScanConfig) -> bool { - !(config.file_compression_type.is_compressed() || config.new_lines_in_values) - } } /// The struct arrow that implements `[FileOpener]` trait diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index b0a1d8c8c9e2..ac65ec2eec90 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -255,10 +255,15 @@ impl FileSource for AvroSource { fn file_type(&self) -> &str { "avro" } - fn supports_repartition(&self, config: &FileScanConfig) -> bool { - !(config.file_compression_type.is_compressed() - || config.new_lines_in_values - || self.as_any().downcast_ref::().is_some()) + + fn repartitioned( + &self, + _target_partitions: usize, + _repartition_file_min_size: usize, + _output_ordering: Option, + _config: &FileScanConfig, + ) -> Result> { + Ok(None) } } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index c0952229b5e0..b7652d59663b 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -618,9 +618,6 @@ impl FileSource for CsvSource { fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { write!(f, ", has_header={}", self.has_header) } - fn supports_repartition(&self, config: &FileScanConfig) -> bool { - !(config.file_compression_type.is_compressed() || config.new_lines_in_values) - } } impl FileOpener for CsvOpener { diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 123ecc2f9582..8be66250bdba 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -19,8 +19,8 @@ //! file sources. use super::{ - get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupPartitioner, - FileGroupsDisplay, FileStream, + get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupsDisplay, + FileStream, }; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; @@ -203,30 +203,21 @@ impl DataSource for FileScanConfig { self.fmt_file_source(t, f) } - /// Redistribute files across partitions according to their size - /// See comments on [`FileGroupPartitioner`] for more detail. + /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size. fn repartitioned( &self, target_partitions: usize, repartition_file_min_size: usize, output_ordering: Option, ) -> Result>> { - if !self.source.supports_repartition(self) { - return Ok(None); - } - - let repartitioned_file_groups_option = FileGroupPartitioner::new() - .with_target_partitions(target_partitions) - .with_repartition_file_min_size(repartition_file_min_size) - .with_preserve_order_within_groups(output_ordering.is_some()) - .repartition_file_groups(&self.file_groups); - - if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { - let mut source = self.clone(); - source.file_groups = repartitioned_file_groups; - return Ok(Some(Arc::new(source))); - } - Ok(None) + let source = self.source.repartitioned( + target_partitions, + repartition_file_min_size, + output_ordering, + self, + )?; + + Ok(source.map(|s| Arc::new(s) as _)) } fn output_partitioning(&self) -> Partitioning { diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 590b1cb88dcd..f389dd37c40d 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -313,10 +313,6 @@ impl FileSource for JsonSource { fn file_type(&self) -> &str { "json" } - - fn supports_repartition(&self, config: &FileScanConfig) -> bool { - !(config.file_compression_type.is_compressed() || config.new_lines_in_values) - } } impl FileOpener for JsonOpener { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/source.rs b/datafusion/core/src/datasource/physical_plan/parquet/source.rs index 21881112075d..4d4235f2d2bb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/source.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/source.rs @@ -586,7 +586,4 @@ impl FileSource for ParquetSource { } } } - fn supports_repartition(&self, _config: &FileScanConfig) -> bool { - true - } }