Skip to content

Allow FileSource-specific repartitioning #14754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions datafusion/core/src/datasource/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use crate::datasource::physical_plan::{FileOpener, FileScanConfig};

use arrow::datatypes::SchemaRef;
use datafusion_common::Statistics;
use datafusion_datasource::file_groups::FileGroupPartitioner;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

Expand Down Expand Up @@ -67,4 +69,33 @@ pub trait FileSource: Send + Sync {
/// If this returns true, the DataSourceExec may repartition the data
/// by breaking up the input files into multiple smaller groups.
fn supports_repartition(&self, config: &FileScanConfig) -> bool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could remove supports_repartition as well:

  1. This API hasn't yet been released so it wouldn't be a breaking change
  2. It would ensure that all places in the code that need to check repartition use a single API (and thus are consistently done)

Copy link
Contributor Author

@AdamGS AdamGS Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah absolutely, give me a second to pull it up and take a look

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the PR here: #14123


/// If supported by the [`FileSource`], redistribute files across partitions according to their size.
/// Allows custom file formats to implement their own repartitioning logic.
///
/// Provides a default repartitioning behavior, see comments on [`FileGroupPartitioner`] for more detail.
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> datafusion_common::Result<Option<FileScanConfig>> {
if !self.supports_repartition(config) {
return Ok(None);
}

let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&config.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = config.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(source));
}
Ok(None)
}
}
31 changes: 11 additions & 20 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
//! file sources.

use super::{
get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupPartitioner,
FileGroupsDisplay, FileStream,
get_projected_output_ordering, statistics::MinMaxStatistics, FileGroupsDisplay,
FileStream,
};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl};
Expand Down Expand Up @@ -203,30 +203,21 @@ impl DataSource for FileScanConfig {
self.fmt_file_source(t, f)
}

/// Redistribute files across partitions according to their size
/// See comments on [`FileGroupPartitioner`] for more detail.
/// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
if !self.source.supports_repartition(self) {
return Ok(None);
}

let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&self.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = self.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(Arc::new(source)));
}
Ok(None)
let source = self.source.repartitioned(
target_partitions,
repartition_file_min_size,
output_ordering,
self,
)?;

Ok(source.map(|s| Arc::new(s) as _))
}

fn output_partitioning(&self) -> Partitioning {
Expand Down
Loading