Skip to content

Commit 88fbbf1

Browse files
authored
Merge pull request #26 from akurmustafa/pr_14038_suggestion
Pr 14038 suggestion
2 parents d78d711 + dc3785e commit 88fbbf1

File tree

9 files changed

+78
-49
lines changed

9 files changed

+78
-49
lines changed

datafusion-cli/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-expr-common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,5 @@ arrow = { workspace = true }
4141
datafusion-common = { workspace = true, default-features = true }
4242
datafusion-expr-common = { workspace = true }
4343
hashbrown = { workspace = true }
44+
indexmap = { workspace = true }
4445
itertools = { workspace = true }

datafusion/physical-expr-common/src/sort_expr.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ use arrow::datatypes::Schema;
3030
use arrow::record_batch::RecordBatch;
3131
use datafusion_common::Result;
3232
use datafusion_expr_common::columnar_value::ColumnarValue;
33-
use itertools::Itertools;
33+
use indexmap::IndexSet;
34+
use itertools::{izip, Itertools};
3435

3536
/// Represents Sort operation for a column in a RecordBatch
3637
///
@@ -409,6 +410,22 @@ impl LexOrdering {
409410
.map(PhysicalSortExpr::from)
410411
.collect()
411412
}
413+
414+
/// Collapse a `LexOrdering` into a new duplicate-free `LexOrdering` based on expression.
415+
///
416+
/// This function filters duplicate entries that have same physical
417+
/// expression inside, ignoring [`SortOptions`]. For example:
418+
///
419+
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
420+
pub fn collapse(self) -> Self {
421+
let mut output = LexOrdering::default();
422+
for item in self {
423+
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
424+
output.push(item);
425+
}
426+
}
427+
output
428+
}
412429
}
413430

414431
impl From<Vec<PhysicalSortExpr>> for LexOrdering {
@@ -540,6 +557,33 @@ impl LexRequirement {
540557
.collect(),
541558
)
542559
}
560+
561+
/// Constructs a duplicate-free `LexOrderingReq` by filtering out
562+
/// duplicate entries that have same physical expression inside.
563+
///
564+
/// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a
565+
/// Some(ASC)]`.
566+
///
567+
/// It will also filter out entries that are ordered if the next entry is;
568+
/// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to
569+
/// `vec![a Some(ASC)]`.
570+
pub fn collapse(self) -> Self {
571+
let mut output = Vec::<PhysicalSortRequirement>::new();
572+
let mut exprs = IndexSet::new();
573+
let mut reqs = vec![];
574+
for item in self {
575+
let PhysicalSortRequirement { expr, options: req } = item;
576+
// new insertion
577+
if exprs.insert(expr) {
578+
reqs.push(req);
579+
}
580+
}
581+
debug_assert_eq!(reqs.len(), exprs.len());
582+
for (expr, req) in izip!(exprs, reqs) {
583+
output.push(PhysicalSortRequirement::new(expr, req));
584+
}
585+
LexRequirement::new(output)
586+
}
543587
}
544588

545589
impl From<LexOrdering> for LexRequirement {

datafusion/physical-expr/src/equivalence/class.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
18+
use super::{add_offset_to_expr, ProjectionMapping};
1919
use crate::{
2020
expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef,
2121
PhysicalSortExpr, PhysicalSortRequirement,
@@ -527,12 +527,13 @@ impl EquivalenceGroup {
527527
&self,
528528
sort_reqs: &LexRequirement,
529529
) -> LexRequirement {
530-
collapse_lex_req(LexRequirement::new(
530+
LexRequirement::new(
531531
sort_reqs
532532
.iter()
533533
.map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
534534
.collect(),
535-
))
535+
)
536+
.collapse()
536537
}
537538

538539
/// Projects `expr` according to the given projection mapping.

datafusion/physical-expr/src/equivalence/mod.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use crate::expressions::Column;
21-
use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement};
21+
use crate::{LexRequirement, PhysicalExpr};
2222

2323
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
2424

@@ -41,14 +41,9 @@ pub use properties::{
4141
/// It will also filter out entries that are ordered if the next entry is;
4242
/// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to
4343
/// `vec![a Some(ASC)]`.
44+
#[deprecated(since = "45.0.0", note = "Use LexRequirement::collapse")]
4445
pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
45-
let mut output = Vec::<PhysicalSortRequirement>::new();
46-
for item in input {
47-
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
48-
output.push(item);
49-
}
50-
}
51-
LexRequirement::new(output)
46+
input.collapse()
5247
}
5348

5449
/// Adds the `offset` value to `Column` indices inside `expr`. This function is
@@ -80,7 +75,9 @@ mod tests {
8075
use arrow::datatypes::{DataType, Field, Schema};
8176
use arrow_schema::{SchemaRef, SortOptions};
8277
use datafusion_common::{plan_datafusion_err, Result};
83-
use datafusion_physical_expr_common::sort_expr::LexOrdering;
78+
use datafusion_physical_expr_common::sort_expr::{
79+
LexOrdering, PhysicalSortRequirement,
80+
};
8481

8582
pub fn output_schema(
8683
mapping: &ProjectionMapping,

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,13 @@ impl OrderingEquivalenceClass {
146146
/// Returns the concatenation of all the orderings. This enables merge
147147
/// operations to preserve all equivalent orderings simultaneously.
148148
pub fn output_ordering(&self) -> Option<LexOrdering> {
149-
let output_ordering = self.orderings.iter().flatten().cloned().collect();
150-
let output_ordering = collapse_lex_ordering(output_ordering);
149+
let output_ordering = self
150+
.orderings
151+
.iter()
152+
.flatten()
153+
.cloned()
154+
.collect::<LexOrdering>()
155+
.collapse();
151156
(!output_ordering.is_empty()).then_some(output_ordering)
152157
}
153158

@@ -207,19 +212,6 @@ impl IntoIterator for OrderingEquivalenceClass {
207212
}
208213
}
209214

210-
/// This function constructs a duplicate-free `LexOrdering` by filtering out
211-
/// duplicate entries that have same physical expression inside. For example,
212-
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
213-
pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering {
214-
let mut output = LexOrdering::default();
215-
for item in input.iter() {
216-
if !output.iter().any(|req| req.expr.eq(&item.expr)) {
217-
output.push(item.clone());
218-
}
219-
}
220-
output
221-
}
222-
223215
/// Trims `orderings[idx]` if some suffix of it overlaps with a prefix of
224216
/// `orderings[pre_idx]`. Returns `true` if there is any overlap, `false` otherwise.
225217
fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> bool {

datafusion/physical-expr/src/equivalence/properties.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@ use std::slice::Iter;
2222
use std::sync::Arc;
2323
use std::{fmt, mem};
2424

25-
use super::ordering::collapse_lex_ordering;
2625
use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
2726
use crate::equivalence::{
28-
collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass,
29-
ProjectionMapping,
27+
EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
3028
};
3129
use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
3230
use crate::{
@@ -501,15 +499,12 @@ impl EquivalenceProperties {
501499
);
502500
let constants_normalized = self.eq_group.normalize_exprs(constant_exprs);
503501
// Prune redundant sections in the requirement:
504-
collapse_lex_req(
505-
normalized_sort_reqs
506-
.iter()
507-
.filter(|&order| {
508-
!physical_exprs_contains(&constants_normalized, &order.expr)
509-
})
510-
.cloned()
511-
.collect(),
512-
)
502+
normalized_sort_reqs
503+
.iter()
504+
.filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr))
505+
.cloned()
506+
.collect::<LexRequirement>()
507+
.collapse()
513508
}
514509

515510
/// Checks whether the given ordering is satisfied by any of the existing
@@ -911,7 +906,7 @@ impl EquivalenceProperties {
911906
// Simplify each ordering by removing redundant sections:
912907
orderings
913908
.chain(projected_orderings)
914-
.map(collapse_lex_ordering)
909+
.map(|lex_ordering| lex_ordering.collapse())
915910
.collect()
916911
}
917912

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ use datafusion_execution::TaskContext;
4444
use datafusion_expr::{Accumulator, Aggregate};
4545
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
4646
use datafusion_physical_expr::{
47-
equivalence::{collapse_lex_req, ProjectionMapping},
48-
expressions::Column,
49-
physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement,
50-
PhysicalExpr, PhysicalSortRequirement,
47+
equivalence::ProjectionMapping, expressions::Column, physical_exprs_contains,
48+
EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr,
49+
PhysicalSortRequirement,
5150
};
5251

5352
use itertools::Itertools;
@@ -473,7 +472,7 @@ impl AggregateExec {
473472
&mode,
474473
)?;
475474
new_requirement.inner.extend(req);
476-
new_requirement = collapse_lex_req(new_requirement);
475+
new_requirement = new_requirement.collapse();
477476

478477
// If our aggregation has grouping sets then our base grouping exprs will
479478
// be expanded based on the flags in `group_by.groups` where for each

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use datafusion_expr::{
3232
PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF,
3333
};
3434
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
35-
use datafusion_physical_expr::equivalence::collapse_lex_req;
3635
use datafusion_physical_expr::{
3736
reverse_order_bys,
3837
window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr},
@@ -469,8 +468,8 @@ pub fn get_window_mode(
469468
{
470469
let req = LexRequirement::new(
471470
[partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(),
472-
);
473-
let req = collapse_lex_req(req);
471+
)
472+
.collapse();
474473
if partition_by_eqs.ordering_satisfy_requirement(&req) {
475474
// Window can be run with existing ordering
476475
let mode = if indices.len() == partitionby_exprs.len() {

0 commit comments

Comments
 (0)