From c81c0333098718eaddbc3c09fe665f2a0f1fed83 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 28 May 2026 16:26:48 +0800 Subject: [PATCH] [branch-53] perf: skip ensure_distribution rebuild when children are unchanged Cherry-pick of upstream apache/datafusion#22521 (approved by 2010YOUY01 and alamb, awaiting maintainer merge). Adapted to branch-53's file layout (datafusion/physical-optimizer/src/enforce_distribution.rs vs the upstream ensure_requirements/ subdirectory) and to the older plan.as_any().is::() API. ensure_distribution was unconditionally calling plan.with_new_children after collecting the (possibly redistributed) children, even when none of the children were actually replaced. For nodes like ProjectionExec that path runs through try_new and recomputes schema, equivalence properties, output ordering, and partitioning, which is pure overhead when the input Arcs are pointer-identical. Routes the non-UnionExec branch through the existing with_new_children_if_necessary helper, which does the Arc::ptr_eq short-circuit and reuses the input plan when no children changed. UnionExec to InterleaveExec special case still runs first because it intentionally produces a new node even when child Arcs are unchanged. Linear: X-2631 --- .../enforce_distribution.rs | 35 +++++++++++++++++++ .../src/enforce_distribution.rs | 15 ++++++-- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 7a31d4cd411e7..d30ddc69703e2 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3695,3 +3695,38 @@ fn adjust_input_keys_ordering_no_transform_for_filter_scan() -> Result<()> { ); Ok(()) } + +/// Verifies the `ensure_distribution` fast path: when no child of a node is +/// replaced (no `RepartitionExec` or `SortExec` injection is required), +/// the rule must reuse the input `Arc` unchanged instead +/// of calling `with_new_children`. For a deep `ProjectionExec` chain over a +/// single-partition scan with `target_partitions = 1`, every node hits this +/// fast path, so the root returned by `ensure_distribution` must be the +/// same `Arc` as the input. +/// +/// Regression test for the optimization that avoids +/// `ProjectionExec::with_new_children` (which recomputes schema, equivalence +/// properties, output ordering, and partitioning) on the common point-query +/// plan shape. Cherry-pick of upstream apache/datafusion#22521. +#[test] +fn ensure_distribution_reuses_plan_arc_when_no_redistribution_needed() -> Result<()> { + let scan = parquet_exec(); + let proj1 = projection_exec_with_alias( + scan, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "b".to_string()), + ], + ); + let proj2 = + projection_exec_with_alias(proj1, vec![("a".to_string(), "a".to_string())]); + let plan: Arc = proj2; + + let result = ensure_distribution_helper(Arc::clone(&plan), 1, false)?; + + assert!( + Arc::ptr_eq(&result, &plan), + "ensure_distribution must reuse the input Arc when no children require redistribution" + ); + Ok(()) +} diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index f7b456fa1fd24..28a40d86df041 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -58,7 +58,9 @@ use datafusion_physical_plan::tree_node::PlanContext; use datafusion_physical_plan::union::{InterleaveExec, UnionExec, can_interleave}; use datafusion_physical_plan::windows::WindowAggExec; use datafusion_physical_plan::windows::{BoundedWindowAggExec, get_best_fitting_window}; -use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning}; +use datafusion_physical_plan::{ + Distribution, ExecutionPlan, Partitioning, with_new_children_if_necessary, +}; use itertools::izip; @@ -1429,7 +1431,16 @@ pub fn ensure_distribution( // Data Arc::new(InterleaveExec::try_new(children_plans)?) } else { - plan.with_new_children(children_plans)? + // Route through `with_new_children_if_necessary` so the common + // case where no child was replaced above skips the expensive + // `with_new_children` rebuild. For nodes like `ProjectionExec`, + // `with_new_children` recomputes schema / equivalence properties / + // output ordering via `try_new` even when the input Arcs are + // identical, which dominates `ensure_distribution` time on deep + // projection stacks over plans where no distribution change + // applies (point queries with no join / aggregate / unmet + // ordering). Cherry-pick of upstream apache/datafusion#22521. + with_new_children_if_necessary(plan, children_plans)? }; Ok(Transformed::yes(DistributionContext::new(