Skip to content

Reuse last projection layer when renaming columns #14684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 75 additions & 31 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use datafusion_expr::{
expr::{Alias, ScalarFunction},
is_null, lit,
utils::COUNT_STAR_EXPANSION,
SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE,
Projection, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE,
};
use datafusion_functions::core::coalesce;
use datafusion_functions_aggregate::expr_fn::{
Expand Down Expand Up @@ -1963,7 +1963,7 @@ impl DataFrame {
/// # }
/// ```
pub fn with_column_renamed(
self,
mut self,
old_name: impl Into<String>,
new_name: &str,
) -> Result<DataFrame> {
Expand All @@ -1972,41 +1972,85 @@ impl DataFrame {
.config_options()
.sql_parser
.enable_ident_normalization;

let old_column: Column = if ident_opts {
Column::from_qualified_name(old_name)
} else {
Column::from_qualified_name_ignore_case(old_name)
};

let (qualifier_rename, field_rename) =
match self.plan.schema().qualified_field_from_column(&old_column) {
Ok(qualifier_and_field) => qualifier_and_field,
// no-op if field not found
Err(DataFusionError::SchemaError(
SchemaError::FieldNotFound { .. },
_,
)) => return Ok(self),
Err(err) => return Err(err),
};
let projection = self
.plan
.schema()
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
(
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name),
false,
)
} else {
(col(Column::from((qualifier, field))), false)
}
})
.collect::<Vec<_>>();
let project_plan = LogicalPlanBuilder::from(self.plan)
.project_with_validation(projection)?
.build()?;
let project_plan = if let LogicalPlan::Projection(Projection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear, the projections are flattened as part of the planning process right? So this PR mostly is about making logical EXPLAIN plans look nicer with with_column_renamed?

If so it seems like a lot of complexity for (just) that feature. Maybe we can switch to showing optimized explain plans or something for people who want to see a simpler version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may have been related to trying to improve planning performance by reducing the number of projections up front so planning doesn't need to do it. Improving the logical plan output is definitely nice though :)

Copy link
Contributor Author

@blaginin blaginin Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed removed at the optimisation step! The idea behind that PR was that sometimes you might want to call functions involving expr tree traversal before performing optimisations. For example:

  1. Create a df
  2. Rename a lot of columns
  3. Call something like replace_params_with_values ← then, it'll iterate over a lot of projection layers

However, after revisiting the PR half a year later 😁, I feel like:

  • this isn't actually a problem (spent 30 mins and couldn't easily reproduce the problem)
  • it's very niche
  • it does make the code more complicated

So I'll close the PR for now, but will keep an eye on related issues and resubmit if it turns out to be a real problem.

expr,
input,
schema,
..
}) = self.plan
{
// special case: we already have a projection on top, so we can reuse it rather than creating a new one
let (qualifier_rename, field_rename) =
match schema.qualified_field_from_column(&old_column) {
Ok(qualifier_and_field) => qualifier_and_field,
// no-op if field not found
Err(DataFusionError::SchemaError(
SchemaError::FieldNotFound { .. },
_,
)) => {
self.plan = LogicalPlan::Projection(
Copy link
Contributor

@Omega359 Omega359 Feb 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the field is not found why are we not just returning Ok(self)? I am unsure why we would need to add a projection for the plan in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do that because self was destructed and so I need to put plan back. try_new_with_schema is quite cheap and this shouldn't be happening often anyway

Projection::try_new_with_schema(expr, input, schema)?,
);
return Ok(self);
}
Err(err) => return Err(err),
};

let expr: Vec<_> = expr
.into_iter()
.map(|e| {
let (qualifier, field) = e.qualified_name();

if qualifier.as_ref().eq(&qualifier_rename)
&& field.as_str() == field_rename.name()
{
e.alias_qualified(qualifier, new_name.to_string())
} else {
e
}
})
.collect();
LogicalPlan::Projection(Projection::try_new(expr, input)?)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a lot of duplication here. I wonder if there is a way we could reduce that? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's very fair! I also dislike repeating qualifier_rename, field_rename twice, but it's happening because they reference plan and must come after its destruction (hence needed twice)

let (qualifier_rename, field_rename) =
match self.plan.schema().qualified_field_from_column(&old_column) {
Ok(qualifier_and_field) => qualifier_and_field,
// no-op if field not found
Err(DataFusionError::SchemaError(
SchemaError::FieldNotFound { .. },
_,
)) => return Ok(self),
Err(err) => return Err(err),
};

let projection = self
.plan
.schema()
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
(
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name),
false,
)
} else {
(col(Column::from((qualifier, field))), false)
}
})
.collect::<Vec<_>>();

LogicalPlanBuilder::from(self.plan)
.project_with_validation(projection)?
.build()?
};
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,18 @@ async fn with_column_renamed() -> Result<()> {
// no-op for missing column
.with_column_renamed("c4", "boom")?;

// one projection is reused for all renames
assert_snapshot!(
df_sum_renamed.logical_plan(),
@r#"
Projection: aggregate_test_100.c1 AS one, aggregate_test_100.c2 AS two, aggregate_test_100.c3, aggregate_test_100.c2 + aggregate_test_100.c3 AS sum AS total
Limit: skip=0, fetch=1
Sort: aggregate_test_100.c1 ASC NULLS FIRST, aggregate_test_100.c2 ASC NULLS FIRST, aggregate_test_100.c3 ASC NULLS FIRST
Filter: aggregate_test_100.c2 = Int32(3) AND aggregate_test_100.c1 = Utf8("a")
Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3
TableScan: aggregate_test_100
"#);

let references: Vec<_> = df_sum_renamed
.schema()
.iter()
Expand Down