diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index 73a3eb10c28f..a43ada82ee24 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -26,7 +26,7 @@ use crate::physical_plan::{ udaf, ExecutionPlan, PhysicalExpr, }; use arrow::datatypes::Schema; -use arrow_schema::{SchemaRef, SortOptions}; +use arrow_schema::SchemaRef; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{ @@ -47,7 +47,6 @@ pub use bounded_window_agg_exec::BoundedWindowAggExec; pub use bounded_window_agg_exec::PartitionSearchMode; use datafusion_common::utils::longest_consecutive_prefix; use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder; -use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_exprs}; pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, @@ -245,32 +244,14 @@ pub(crate) fn window_ordering_equivalence( .with_equivalences(input.equivalence_properties()) .with_existing_ordering(input.output_ordering().map(|elem| elem.to_vec())) .extend(input.ordering_equivalence_properties()); + for expr in window_expr { if let Some(builtin_window_expr) = expr.as_any().downcast_ref::() { - // Only the built-in `RowNumber` window function introduces a new - // ordering: - if builtin_window_expr + builtin_window_expr .get_built_in_func_expr() - .as_any() - .is::() - { - if let Some((idx, field)) = - schema.column_with_name(builtin_window_expr.name()) - { - let column = Column::new(field.name(), idx); - let options = SortOptions { - descending: false, - nulls_first: false, - }; // ASC, NULLS LAST - let rhs = PhysicalSortExpr { - expr: Arc::new(column) as _, - options, - }; - builder.add_equal_conditions(vec![rhs]); - } - } + .add_equal_orderings(&mut builder); } } builder.build() diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs index 78279851bba5..b7e2c445de32 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence.rs @@ -296,16 +296,18 @@ pub struct OrderingEquivalenceBuilder { eq_properties: EquivalenceProperties, ordering_eq_properties: OrderingEquivalenceProperties, existing_ordering: Vec, + schema: SchemaRef, } impl OrderingEquivalenceBuilder { pub fn new(schema: SchemaRef) -> Self { let eq_properties = EquivalenceProperties::new(schema.clone()); - let ordering_eq_properties = OrderingEquivalenceProperties::new(schema); + let ordering_eq_properties = OrderingEquivalenceProperties::new(schema.clone()); Self { eq_properties, ordering_eq_properties, existing_ordering: vec![], + schema, } } @@ -358,6 +360,11 @@ impl OrderingEquivalenceBuilder { } } + /// Return a reference to the schema with which this builder was constructed with + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + pub fn build(self) -> OrderingEquivalenceProperties { self.ordering_eq_properties } diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs index 4b42009ca410..763bcfc2b17f 100644 --- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs +++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs @@ -16,6 +16,7 @@ // under the License. use super::partition_evaluator::PartitionEvaluator; +use crate::equivalence::OrderingEquivalenceBuilder; use crate::PhysicalExpr; use arrow::array::ArrayRef; use arrow::datatypes::Field; @@ -79,6 +80,12 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { None } + /// Adds any equivalent orderings generated by this expression + /// to `builder`. + /// + /// The default implementation does nothing + fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {} + /// Can the window function be incrementally computed using /// bounded memory? /// diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index 9883d67f7cd8..bbd251d53959 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -17,12 +17,15 @@ //! Defines physical expression for `row_number` that can evaluated at runtime during query execution +use crate::equivalence::OrderingEquivalenceBuilder; +use crate::expressions::Column; use crate::window::partition_evaluator::PartitionEvaluator; use crate::window::window_expr::{BuiltinWindowState, NumRowsState}; use crate::window::BuiltInWindowFunctionExpr; -use crate::PhysicalExpr; +use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::{ArrayRef, UInt64Array}; use arrow::datatypes::{DataType, Field}; +use arrow_schema::SortOptions; use datafusion_common::{Result, ScalarValue}; use std::any::Any; use std::ops::Range; @@ -61,6 +64,24 @@ impl BuiltInWindowFunctionExpr for RowNumber { &self.name } + fn add_equal_orderings(&self, builder: &mut OrderingEquivalenceBuilder) { + // The built-in RowNumber window function introduces a new + // ordering: + let schema = builder.schema(); + if let Some((idx, field)) = schema.column_with_name(self.name()) { + let column = Column::new(field.name(), idx); + let options = SortOptions { + descending: false, + nulls_first: false, + }; // ASC, NULLS LAST + let rhs = PhysicalSortExpr { + expr: Arc::new(column) as _, + options, + }; + builder.add_equal_conditions(vec![rhs]); + } + } + fn create_evaluator(&self) -> Result> { Ok(Box::::default()) }