Skip to content

Reuse last projection layer when renaming columns #14684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 72 additions & 28 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use datafusion_common::{
SchemaError, UnnestOptions,
};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{case, is_null, lit, Projection, SortExpr};
use datafusion_expr::{
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
};
Expand Down Expand Up @@ -1780,7 +1780,7 @@ impl DataFrame {
/// # }
/// ```
pub fn with_column_renamed(
self,
mut self,
old_name: impl Into<String>,
new_name: &str,
) -> Result<DataFrame> {
Expand All @@ -1789,38 +1789,82 @@ impl DataFrame {
.config_options()
.sql_parser
.enable_ident_normalization;

let old_column: Column = if ident_opts {
Column::from_qualified_name(old_name)
} else {
Column::from_qualified_name_ignore_case(old_name)
};

let (qualifier_rename, field_rename) =
match self.plan.schema().qualified_field_from_column(&old_column) {
Ok(qualifier_and_field) => qualifier_and_field,
// no-op if field not found
Err(DataFusionError::SchemaError(
SchemaError::FieldNotFound { .. },
_,
)) => return Ok(self),
Err(err) => return Err(err),
};
let projection = self
.plan
.schema()
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name)
} else {
col(Column::from((qualifier, field)))
}
})
.collect::<Vec<_>>();
let project_plan = LogicalPlanBuilder::from(self.plan)
.project(projection)?
.build()?;
let project_plan = if let LogicalPlan::Projection(Projection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear, the projections are flattened as part of the planning process right? So this PR mostly is about making logical EXPLAIN plans look nicer with with_column_renamed?

If so it seems like a lot of complexity for (just) that feature. Maybe we can switch to showing optimized explain plans or something for people who want to see a simpler version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may have been related to trying to improve planning performance by reducing the number of projections up front so planning doesn't need to do it. Improving the logical plan output is definitely nice though :)

Copy link
Contributor Author

@blaginin blaginin Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed removed at the optimisation step! The idea behind that PR was that sometimes you might want to call functions involving expr tree traversal before performing optimisations. For example:

  1. Create a df
  2. Rename a lot of columns
  3. Call something like replace_params_with_values ← then, it'll iterate over a lot of projection layers

However, after revisiting the PR half a year later 😁, I feel like:

  • this isn't actually a problem (spent 30 mins and couldn't easily reproduce the problem)
  • it's very niche
  • it does make the code more complicated

So I'll close the PR for now, but will keep an eye on related issues and resubmit if it turns out to be a real problem.

expr,
input,
schema,
..
}) = self.plan
{
// special case: we already have a projection on top, so we can reuse it rather than creating a new one
let (qualifier_rename, field_rename) =
match schema.qualified_field_from_column(&old_column) {
Ok(qualifier_and_field) => qualifier_and_field,
// no-op if field not found
Err(DataFusionError::SchemaError(
SchemaError::FieldNotFound { .. },
_,
)) => {
self.plan = LogicalPlan::Projection(
Copy link
Contributor

@Omega359 Omega359 Feb 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the field is not found why are we not just returning Ok(self)? I am unsure why we would need to add a projection for the plan in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do that because self was destructed and so I need to put plan back. try_new_with_schema is quite cheap and this shouldn't be happening often anyway

Projection::try_new_with_schema(expr, input, schema)?,
);
return Ok(self);
}
Err(err) => return Err(err),
};

let expr: Vec<_> = expr
.into_iter()
.map(|e| {
let (qualifier, field) = e.qualified_name();

if qualifier.as_ref().eq(&qualifier_rename)
&& field.as_str() == field_rename.name()
{
e.alias_qualified(qualifier, new_name.to_string())
} else {
e
}
})
.collect();
LogicalPlan::Projection(Projection::try_new(expr, input)?)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a lot of duplication here. I wonder if there is a way we could reduce that? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's very fair! I also dislike repeating qualifier_rename, field_rename twice, but it's happening because they reference plan and must come after its destruction (hence needed twice)

let (qualifier_rename, field_rename) =
match self.plan.schema().qualified_field_from_column(&old_column) {
Ok(qualifier_and_field) => qualifier_and_field,
// no-op if field not found
Err(DataFusionError::SchemaError(
SchemaError::FieldNotFound { .. },
_,
)) => return Ok(self),
Err(err) => return Err(err),
};

let projection = self
.plan
.schema()
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name)
} else {
col(Column::from((qualifier, field)))
}
})
.collect::<Vec<_>>();

LogicalPlanBuilder::from(self.plan)
.project(projection)?
.build()?
};
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
Expand Down
12 changes: 11 additions & 1 deletion datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1619,6 +1619,16 @@ async fn with_column_renamed() -> Result<()> {
// no-op for missing column
.with_column_renamed("c4", "boom")?;

assert_eq!("\
Projection: aggregate_test_100.c1 AS one, aggregate_test_100.c2 AS two, aggregate_test_100.c3, aggregate_test_100.c2 + aggregate_test_100.c3 AS sum AS total\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AS sum AS total is valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the thorough reading 🙏 AS sum AS total happens because there's an alias over an alias - I believe it's harmless since it just wraps the original expression.

There's a function unalias_nested that can resolve this, but I don't want to call it because it's recursive and expensive. A potentially better option IMO is to reuse the existing alias in alias_qualified and alias if it's already there (similar to this PR). This feels like a separate improvement from this PR though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just worry that another column referenced 'sum' and then it is aliased to 'total' and what the behaviour of that would be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I don't think that's possible? "total" cannot be referenced within the same projection, because for that projection it is not in its input. But to display it in a nicer way, I'll do #14781

\n Limit: skip=0, fetch=1\
\n Sort: aggregate_test_100.c1 ASC NULLS FIRST, aggregate_test_100.c2 ASC NULLS FIRST, aggregate_test_100.c3 ASC NULLS FIRST\
\n Filter: aggregate_test_100.c2 = Int32(3) AND aggregate_test_100.c1 = Utf8(\"a\")\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100",
format!("{}", df_sum_renamed.logical_plan()) // one projection is reused for all renames
);

let references: Vec<_> = df_sum_renamed
.schema()
.iter()
Expand All @@ -1645,7 +1655,7 @@ async fn with_column_renamed() -> Result<()> {
"| a | 3 | -72 | -69 |",
"+-----+-----+-----+-------+",
],
batches
&batches
);

Ok(())
Expand Down
Loading