Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ use crate::schema_adapter::SchemaAdapterFactory;
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, Result, Statistics};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::projection::{
all_alias_free_columns, new_projections_for_columns,
};
use datafusion_physical_plan::DisplayFormatType;

use object_store::ObjectStore;
Expand Down Expand Up @@ -129,6 +134,47 @@ pub trait FileSource: Send + Sync {
))
}

fn try_pushdown_projections(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A docstring would be great

&self,
projection_exprs: &ProjectionExprs,
file_schema: &SchemaRef,
current_projection: Option<&[usize]>,
) -> Result<ProjectionPushdownResult> {
let projection_slice: Vec<_> = projection_exprs.iter().cloned().collect();

// check if there are any partition columns in projection (columns beyond file schema)
let partitioned_columns_in_proj = projection_slice.iter().any(|proj_expr| {
proj_expr
.expr
.as_any()
.downcast_ref::<Column>()
.map(|expr| expr.index() >= file_schema.fields().len())
.unwrap_or(false)
});

// if there are any non-column or alias-carrier expressions, projection should not be removed
let no_aliases = all_alias_free_columns(&projection_slice);

if !no_aliases || partitioned_columns_in_proj {
return Ok(ProjectionPushdownResult::None);
}

let all_projections: Vec<usize> = (0..file_schema.fields().len()).collect();
let source_projection = current_projection.unwrap_or(&all_projections);

let new_projection_indices =
new_projections_for_columns(&projection_slice, source_projection);

// return a partial projection with the new projection indices
// if `new_file_source` is None, it means the file source doesn't change,
// rather the new projection is updated in `FileScanConfig`
Ok(ProjectionPushdownResult::Partial {
new_file_source: None,
remaining_projections: None,
new_projection_indices: Some(new_projection_indices),
})
}

/// Set optional schema adapter factory.
///
/// [`SchemaAdapterFactory`] allows user to specify how fields from the
Expand All @@ -155,3 +201,12 @@ pub trait FileSource: Send + Sync {
None
}
}

pub enum ProjectionPushdownResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doctorings please

None,
Partial {
new_file_source: Option<Arc<dyn FileSource>>,
remaining_projections: Option<ProjectionExprs>,
new_projection_indices: Option<Vec<usize>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm something seems off here to me. In my mind this should be more like:

pub struct ProjectionPushdown {
    new_file_source: Arc<dyn FileSource>,
    remaining_projections: Option<ProjectionExprs>,
}

pub type ProjectionPushdownResult = Option<ProjectionPushdown>;        

I don't see how it could make sense to have a remaining projection if the source wasn't updated.

File sources like Parquet will absorb the entire projection.
File sources like CSV will push down indexes and create a remainder expression that handles anything more complex (aliases, operators, etc.). We can make helpers that those file sources call to handle splitting up the projection.
If no projection can be handled (the default) this returns None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we just return Option<ProjectionPushdown> directly (not wrap it in a typedef)?

},
}
77 changes: 38 additions & 39 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [`FileScanConfig`] to configure scanning of possibly partitioned
//! file sources.

use crate::file::ProjectionPushdownResult;
use crate::file_groups::FileGroup;
#[allow(unused_imports)]
use crate::schema_adapter::SchemaAdapterFactory;
Expand All @@ -44,16 +45,14 @@ use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning};
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::projection::{
all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
};
use datafusion_physical_plan::projection::ProjectionExpr;
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
filter_pushdown::FilterPushdownPropagation,
Expand Down Expand Up @@ -679,42 +678,42 @@ impl DataSource for FileScanConfig {
fn try_swapping_with_projection(
&self,
projection: &[ProjectionExpr],
) -> Result<Option<Arc<dyn DataSource>>> {
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.

// Must be all column references, with no table partition columns (which can not be projected)
let partitioned_columns_in_proj = projection.iter().any(|proj_expr| {
proj_expr
.expr
.as_any()
.downcast_ref::<Column>()
.map(|expr| expr.index() >= self.file_schema().fields().len())
.unwrap_or(false)
});
) -> Result<crate::source::ProjectionPushdownResult> {
let new_projection_exprs = ProjectionExprs::from(projection);

// get current projection indices if they exist
let current_projection = self
.projection_exprs
.as_ref()
.map(|p| p.ordered_column_indices());

// pass the new projections to the file source, along with the current projection
// the file source will merge them if possible
let res = self.file_source().try_pushdown_projections(
&new_projection_exprs,
self.file_schema(),
current_projection.as_deref(),
)?;

// If there is any non-column or alias-carrier expression, Projection should not be removed.
let no_aliases = all_alias_free_columns(projection);

Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
let file_scan = self.clone();
let source = Arc::clone(&file_scan.file_source);
let new_projections = new_projections_for_columns(
projection,
&file_scan
.projection_exprs
.as_ref()
.map(|p| p.ordered_column_indices())
.unwrap_or_else(|| (0..self.file_schema().fields().len()).collect()),
);
match res {
ProjectionPushdownResult::None => Ok(None),
ProjectionPushdownResult::Partial {
new_file_source,
remaining_projections,
new_projection_indices,
} => {
let mut builder = FileScanConfigBuilder::from(self.clone());

if let Some(new_source) = new_file_source {
builder = builder.with_source(new_source);
}

builder = builder.with_projection_indices(new_projection_indices);

Arc::new(
FileScanConfigBuilder::from(file_scan)
// Assign projected statistics to source
.with_projection_indices(Some(new_projections))
.with_source(source)
.build(),
) as _
}))
let new_config = Arc::new(builder.build()) as Arc<dyn DataSource>;
Ok(Some((new_config, remaining_projections)))
}
}
}

fn try_pushdown_filters(
Expand Down Expand Up @@ -2300,7 +2299,7 @@ mod tests {

// Simulate projection being updated. Since the filter has already been pushed down,
// the new projection won't include the filtered column.
let data_source = config
let (data_source, _remaining_projections) = config
.try_swapping_with_projection(&[ProjectionExpr::new(
col("c3", &file_schema).unwrap(),
"c3".to_string(),
Expand Down
42 changes: 22 additions & 20 deletions datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ use datafusion_physical_expr::equivalence::project_orderings;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::memory::MemoryStream;
use datafusion_physical_plan::projection::{
all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
};
use datafusion_physical_plan::projection::ProjectionExpr;
use datafusion_physical_plan::{
common, ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
SendableRecordBatchStream, Statistics,
Expand Down Expand Up @@ -228,25 +226,29 @@ impl DataSource for MemorySourceConfig {
fn try_swapping_with_projection(
&self,
projection: &[ProjectionExpr],
) -> Result<Option<Arc<dyn DataSource>>> {
) -> Result<crate::source::ProjectionPushdownResult> {
use datafusion_physical_plan::projection::{
all_alias_free_columns, new_projections_for_columns,
};

// If there is any non-column or alias-carrier expression, Projection should not be removed.
// This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
all_alias_free_columns(projection)
.then(|| {
let all_projections = (0..self.schema.fields().len()).collect();
let new_projections = new_projections_for_columns(
projection,
self.projection().as_ref().unwrap_or(&all_projections),
);
if !all_alias_free_columns(projection) {
return Ok(None);
}

MemorySourceConfig::try_new(
self.partitions(),
self.original_schema(),
Some(new_projections),
)
.map(|s| Arc::new(s) as Arc<dyn DataSource>)
})
.transpose()
let all_projections: Vec<usize> = (0..self.schema.fields().len()).collect();
let new_projections = new_projections_for_columns(
projection,
self.projection().as_ref().unwrap_or(&all_projections),
);

let new_source = MemorySourceConfig::try_new(
self.partitions(),
self.original_schema(),
Some(new_projections),
)?;

Ok(Some((Arc::new(new_source) as Arc<dyn DataSource>, None)))
}
}

Expand Down
18 changes: 15 additions & 3 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_plan::execution_plan::{
Boundedness, EmissionType, SchedulingType,
};
Expand Down Expand Up @@ -175,7 +176,7 @@ pub trait DataSource: Send + Sync + Debug {
fn try_swapping_with_projection(
&self,
_projection: &[ProjectionExpr],
) -> Result<Option<Arc<dyn DataSource>>>;
) -> Result<ProjectionPushdownResult>;
/// Try to push down filters into this DataSource.
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
///
Expand All @@ -191,6 +192,9 @@ pub trait DataSource: Send + Sync + Debug {
}
}

pub type ProjectionPushdownResult =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please document this type (like what the two fields mean)?

I actually think it would be even nicer if this was a real enum so we could document it inline

Perhaps like

/// Result of evaluating projection pushdown ....
enum ProjectionPushdownResult {
  ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. Can I get your thoughts on naming here? I made another ProjectionPushdownResult that is an enum. That type happens to live at the FileSource level

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if we go with https://github.com/apache/datafusion/pull/18309/files#r2467051055 we can have just 1 enum / structure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I'm not sure if that is possible. One stores an Arc<dyn DataSource> while the other stores an Arc<dyn FileSource>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make them generic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what I'm trying to say is that it would be nice to give these types a proper name/alias. PartialPushdownResult<FileSource> and PartialPushdownResult<DataSource> doesn't sound the best to me IMO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what we do for filter pushdown, seems okay

Option<(Arc<dyn DataSource>, Option<ProjectionExprs>)>;

/// [`ExecutionPlan`] that reads one or more files
///
/// `DataSourceExec` implements common functionality such as applying
Expand Down Expand Up @@ -321,8 +325,16 @@ impl ExecutionPlan for DataSourceExec {
.data_source
.try_swapping_with_projection(projection.expr())?
{
Some(new_data_source) => {
Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
Some((new_data_source, remaining_projections)) => {
let new_exec = Arc::new(DataSourceExec::new(new_data_source));
if let Some(remaining_projections) = remaining_projections {
let new_projection_exec =
ProjectionExec::try_new(remaining_projections, new_exec)?;

return Ok(Some(Arc::new(new_projection_exec)));
}

Ok(Some(new_exec))
}
None => Ok(None),
}
Expand Down