Skip to content

Commit 071a2a6

Browse files
authored
Minor: Move get_equal_orderings into BuiltInWindowFunctionExpr, remove BuiltInWindowFunctionExpr::as_any (#6619)
* Minor: Move get_equal_orderings into BuiltInWindowFunctionExpr * revert as_any change * Improve comment
1 parent a56ae74 commit 071a2a6

File tree

4 files changed

+41
-25
lines changed

4 files changed

+41
-25
lines changed

datafusion/core/src/physical_plan/windows/mod.rs

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::physical_plan::{
2626
udaf, ExecutionPlan, PhysicalExpr,
2727
};
2828
use arrow::datatypes::Schema;
29-
use arrow_schema::{SchemaRef, SortOptions};
29+
use arrow_schema::SchemaRef;
3030
use datafusion_common::ScalarValue;
3131
use datafusion_common::{DataFusionError, Result};
3232
use datafusion_expr::{
@@ -47,7 +47,6 @@ pub use bounded_window_agg_exec::BoundedWindowAggExec;
4747
pub use bounded_window_agg_exec::PartitionSearchMode;
4848
use datafusion_common::utils::longest_consecutive_prefix;
4949
use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder;
50-
use datafusion_physical_expr::expressions::Column;
5150
use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_exprs};
5251
pub use datafusion_physical_expr::window::{
5352
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
@@ -245,32 +244,14 @@ pub(crate) fn window_ordering_equivalence(
245244
.with_equivalences(input.equivalence_properties())
246245
.with_existing_ordering(input.output_ordering().map(|elem| elem.to_vec()))
247246
.extend(input.ordering_equivalence_properties());
247+
248248
for expr in window_expr {
249249
if let Some(builtin_window_expr) =
250250
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
251251
{
252-
// Only the built-in `RowNumber` window function introduces a new
253-
// ordering:
254-
if builtin_window_expr
252+
builtin_window_expr
255253
.get_built_in_func_expr()
256-
.as_any()
257-
.is::<RowNumber>()
258-
{
259-
if let Some((idx, field)) =
260-
schema.column_with_name(builtin_window_expr.name())
261-
{
262-
let column = Column::new(field.name(), idx);
263-
let options = SortOptions {
264-
descending: false,
265-
nulls_first: false,
266-
}; // ASC, NULLS LAST
267-
let rhs = PhysicalSortExpr {
268-
expr: Arc::new(column) as _,
269-
options,
270-
};
271-
builder.add_equal_conditions(vec![rhs]);
272-
}
273-
}
254+
.add_equal_orderings(&mut builder);
274255
}
275256
}
276257
builder.build()

datafusion/physical-expr/src/equivalence.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,16 +268,18 @@ pub struct OrderingEquivalenceBuilder {
268268
eq_properties: EquivalenceProperties,
269269
ordering_eq_properties: OrderingEquivalenceProperties,
270270
existing_ordering: Vec<PhysicalSortExpr>,
271+
schema: SchemaRef,
271272
}
272273

273274
impl OrderingEquivalenceBuilder {
274275
pub fn new(schema: SchemaRef) -> Self {
275276
let eq_properties = EquivalenceProperties::new(schema.clone());
276-
let ordering_eq_properties = OrderingEquivalenceProperties::new(schema);
277+
let ordering_eq_properties = OrderingEquivalenceProperties::new(schema.clone());
277278
Self {
278279
eq_properties,
279280
ordering_eq_properties,
280281
existing_ordering: vec![],
282+
schema,
281283
}
282284
}
283285

@@ -330,6 +332,11 @@ impl OrderingEquivalenceBuilder {
330332
}
331333
}
332334

335+
/// Return a reference to the schema with which this builder was constructed with
336+
pub fn schema(&self) -> &SchemaRef {
337+
&self.schema
338+
}
339+
333340
pub fn build(self) -> OrderingEquivalenceProperties {
334341
self.ordering_eq_properties
335342
}

datafusion/physical-expr/src/window/built_in_window_function_expr.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use super::partition_evaluator::PartitionEvaluator;
19+
use crate::equivalence::OrderingEquivalenceBuilder;
1920
use crate::PhysicalExpr;
2021
use arrow::array::ArrayRef;
2122
use arrow::datatypes::Field;
@@ -79,6 +80,12 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
7980
None
8081
}
8182

83+
/// Adds any equivalent orderings generated by this expression
84+
/// to `builder`.
85+
///
86+
/// The default implementation does nothing
87+
fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}
88+
8289
/// Can the window function be incrementally computed using
8390
/// bounded memory?
8491
///

datafusion/physical-expr/src/window/row_number.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
//! Defines physical expression for `row_number` that can evaluated at runtime during query execution
1919
20+
use crate::equivalence::OrderingEquivalenceBuilder;
21+
use crate::expressions::Column;
2022
use crate::window::partition_evaluator::PartitionEvaluator;
2123
use crate::window::window_expr::NumRowsState;
2224
use crate::window::BuiltInWindowFunctionExpr;
23-
use crate::PhysicalExpr;
25+
use crate::{PhysicalExpr, PhysicalSortExpr};
2426
use arrow::array::{ArrayRef, UInt64Array};
2527
use arrow::datatypes::{DataType, Field};
28+
use arrow_schema::SortOptions;
2629
use datafusion_common::{Result, ScalarValue};
2730
use std::any::Any;
2831
use std::ops::Range;
@@ -61,6 +64,24 @@ impl BuiltInWindowFunctionExpr for RowNumber {
6164
&self.name
6265
}
6366

67+
fn add_equal_orderings(&self, builder: &mut OrderingEquivalenceBuilder) {
68+
// The built-in RowNumber window function introduces a new
69+
// ordering:
70+
let schema = builder.schema();
71+
if let Some((idx, field)) = schema.column_with_name(self.name()) {
72+
let column = Column::new(field.name(), idx);
73+
let options = SortOptions {
74+
descending: false,
75+
nulls_first: false,
76+
}; // ASC, NULLS LAST
77+
let rhs = PhysicalSortExpr {
78+
expr: Arc::new(column) as _,
79+
options,
80+
};
81+
builder.add_equal_conditions(vec![rhs]);
82+
}
83+
}
84+
6485
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
6586
Ok(Box::<NumRowsEvaluator>::default())
6687
}

0 commit comments

Comments
 (0)