diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index b2a96d111545..e39508c2d039 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1313,15 +1313,15 @@ mod tests { // ok with one column ( vec![vec![col("string_col").sort(true, false)]], - Ok(vec![LexOrdering { - inner: vec![PhysicalSortExpr { + Ok(vec![LexOrdering::new( + vec![PhysicalSortExpr { expr: physical_col("string_col", &schema).unwrap(), options: SortOptions { descending: false, nulls_first: false, }, }], - } + ) ]) ), // ok with two columns, different options @@ -1330,8 +1330,8 @@ mod tests { col("string_col").sort(true, false), col("int_col").sort(false, true), ]], - Ok(vec![LexOrdering { - inner: vec![ + Ok(vec![LexOrdering::new( + vec![ PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap()) .asc() .nulls_last(), @@ -1339,7 +1339,7 @@ mod tests { .desc() .nulls_first() ], - } + ) ]) ), ]; diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index a5f2bd1760b3..c7fbc775f251 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -1112,9 +1112,8 @@ mod tests { )))) .collect::>(), )); - let sort_order = LexOrdering { - inner: case - .sort + let sort_order = LexOrdering::from( + case.sort .into_iter() .map(|expr| { crate::physical_planner::create_physical_sort_expr( @@ -1124,7 +1123,7 @@ mod tests { ) }) .collect::>>()?, - }; + ); let partitioned_files = case.files.into_iter().map(From::from).collect::>(); diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs b/datafusion/core/src/datasource/physical_plan/statistics.rs index 5e0257022e76..b4a8f377d256 100644 --- a/datafusion/core/src/datasource/physical_plan/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/statistics.rs @@ -119,8 +119,8 @@ impl MinMaxStatistics { projected_schema .project(&(sort_columns.iter().map(|c| c.index()).collect::>()))?, ); - let min_max_sort_order = LexOrdering { - inner: sort_columns + let min_max_sort_order = LexOrdering::from( + sort_columns .iter() .zip(projected_sort_order.iter()) .enumerate() @@ -129,7 +129,7 @@ impl MinMaxStatistics { options: sort.options, }) .collect::>(), - }; + ); let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns .iter() diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 9f5afc7abc2e..aa1e499397eb 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -133,10 +133,7 @@ fn plan_with_order_preserving_variants( if let Some(ordering) = child.output_ordering() { // When the input of a `CoalescePartitionsExec` has an ordering, // replace it with a `SortPreservingMergeExec` if appropriate: - let spm = SortPreservingMergeExec::new( - LexOrdering::new(ordering.inner.clone()), - Arc::clone(child), - ); + let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child)); sort_input.plan = Arc::new(spm) as _; sort_input.children[0].data = true; return Ok(sort_input); diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index cdecc9d31862..9f2c28d564f0 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -40,7 +40,7 @@ pub fn add_sort_above( fetch: Option, ) -> PlanContext { let mut sort_expr = LexOrdering::from(sort_requirements); - sort_expr.inner.retain(|sort_expr| { + sort_expr.retain(|sort_expr| { !node .plan .equivalence_properties() diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs index 4d599879df67..5bf42ea6889f 100644 --- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs +++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs @@ -444,7 +444,7 @@ pub fn generate_table_for_orderings( assert!(!orderings.is_empty()); // Sort the inner vectors by their lengths (longest first) - orderings.sort_by_key(|v| std::cmp::Reverse(v.inner.len())); + orderings.sort_by_key(|v| std::cmp::Reverse(v.len())); let arrays = schema .fields diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 67666f5d7a1c..979aa5a2da03 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -617,7 +617,7 @@ async fn run_window_test( options: SortOptions::default(), }) } - for order_by_expr in &orderby_exprs.inner { + for order_by_expr in &orderby_exprs { if !sort_keys.contains(order_by_expr) { sort_keys.push(order_by_expr.clone()) } diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 63397e69c09d..b150d3dc9bd3 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -325,9 +325,13 @@ fn to_str(options: &SortOptions) -> &str { ///`LexOrdering` contains a `Vec`, which represents /// a lexicographical ordering. +/// +/// For example, `vec![a ASC, b DESC]` represents a lexicographical ordering +/// that first sorts by column `a` in ascending order, then by column `b` in +/// descending order. #[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] pub struct LexOrdering { - pub inner: Vec, + inner: Vec, } impl AsRef for LexOrdering { @@ -337,7 +341,7 @@ impl AsRef for LexOrdering { } impl LexOrdering { - // Creates a new [`LexOrdering`] from a vector + /// Creates a new [`LexOrdering`] from a vector pub fn new(inner: Vec) -> Self { Self { inner } } @@ -348,46 +352,61 @@ impl LexOrdering { &EMPTY_ORDER } + /// Returns the number of elements that can be stored in the LexOrdering + /// without reallocating. pub fn capacity(&self) -> usize { self.inner.capacity() } + /// Clears the LexOrdering, removing all elements. pub fn clear(&mut self) { self.inner.clear() } + /// Returns `true` if the LexOrdering contains `expr` pub fn contains(&self, expr: &PhysicalSortExpr) -> bool { self.inner.contains(expr) } + /// Add all elements from `iter` to the LexOrdering. pub fn extend>(&mut self, iter: I) { self.inner.extend(iter) } + /// Remove all elements from the LexOrdering where `f` evaluates to `false`. + pub fn retain(&mut self, f: F) + where + F: FnMut(&PhysicalSortExpr) -> bool, + { + self.inner.retain(f) + } + + /// Returns `true` if the LexOrdering contains no elements. pub fn is_empty(&self) -> bool { self.inner.is_empty() } - pub fn iter(&self) -> impl Iterator { + /// Returns an iterator over each `&PhysicalSortExpr` in the LexOrdering. + pub fn iter(&self) -> core::slice::Iter { self.inner.iter() } + /// Returns the number of elements in the LexOrdering. pub fn len(&self) -> usize { self.inner.len() } + /// Removes the last element from the LexOrdering and returns it, or `None` if it is empty. pub fn pop(&mut self) -> Option { self.inner.pop() } + /// Appends an element to the back of the LexOrdering. pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) { self.inner.push(physical_sort_expr) } - pub fn retain(&mut self, f: impl FnMut(&PhysicalSortExpr) -> bool) { - self.inner.retain(f) - } - + /// Truncates the LexOrdering, keeping only the first `len` elements. pub fn truncate(&mut self, len: usize) { self.inner.truncate(len) } @@ -400,9 +419,12 @@ impl LexOrdering { /// Converts a `LexRequirement` into a `LexOrdering`. /// - /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` - /// for each entry in the input. If required ordering is None for an entry - /// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`). + /// This function converts [`PhysicalSortRequirement`] to [`PhysicalSortExpr`] + /// for each entry in the input. + /// + /// If the required ordering is `None` for an entry in `requirement`, the + /// default ordering `ASC, NULLS LAST` is used (see + /// [`PhysicalSortExpr::from`]). pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering { requirement .into_iter() @@ -425,6 +447,15 @@ impl LexOrdering { } output } + + /// Transforms each `PhysicalSortExpr` in the `LexOrdering` + /// in place using the provided closure `f`. + pub fn transform(&mut self, f: F) + where + F: FnMut(&mut PhysicalSortExpr), + { + self.inner.iter_mut().for_each(f); + } } impl From> for LexOrdering { @@ -439,6 +470,13 @@ impl From for LexOrdering { } } +/// Convert a `LexOrdering` into a `Arc[]` for fast copies +impl From for Arc<[PhysicalSortExpr]> { + fn from(value: LexOrdering) -> Self { + value.inner.into() + } +} + impl Deref for LexOrdering { type Target = [PhysicalSortExpr]; diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 4954c21728f6..4e324663dcd1 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -207,7 +207,7 @@ impl OrderingEquivalenceClass { for idx in 0..n_ordering { // Calculate cross product index let idx = outer_idx * n_ordering + idx; - self.orderings[idx].inner.extend(ordering.iter().cloned()); + self.orderings[idx].extend(ordering.iter().cloned()); } } self @@ -217,9 +217,9 @@ impl OrderingEquivalenceClass { /// ordering equivalence class. pub fn add_offset(&mut self, offset: usize) { for ordering in self.orderings.iter_mut() { - for sort_expr in ordering.inner.iter_mut() { + ordering.transform(|sort_expr| { sort_expr.expr = add_offset_to_expr(Arc::clone(&sort_expr.expr), offset); - } + }) } } diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 2c7335649b28..c620bb6d3a51 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -187,7 +187,6 @@ impl EquivalenceProperties { let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default(); // Prune out constant expressions output_ordering - .inner .retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr)); (!output_ordering.is_empty()).then_some(output_ordering) } @@ -697,7 +696,6 @@ impl EquivalenceProperties { // Generate all valid orderings, given substituted expressions. let res = new_orderings .into_iter() - .map(|ordering| ordering.inner) .multi_cartesian_product() .map(LexOrdering::new) .collect::>(); @@ -1221,7 +1219,6 @@ impl EquivalenceProperties { let mut new_orderings = vec![]; for ordering in self.oeq_class { let new_ordering = ordering - .inner .into_iter() .map(|mut sort_expr| { sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?; @@ -1507,7 +1504,7 @@ fn generate_dependency_orderings( .map(|prefixes| { prefixes .into_iter() - .flat_map(|ordering| ordering.inner.clone()) + .flat_map(|ordering| ordering.clone()) .collect() }) .collect::>() @@ -2177,8 +2174,8 @@ impl UnionEquivalentOrderingBuilder { existing_constants: &[ConstExpr], ) -> Option { let mut augmented_ordering = LexOrdering::default(); - let mut sort_expr_iter = ordering.inner.iter().peekable(); - let mut existing_sort_expr_iter = existing_ordering.inner.iter().peekable(); + let mut sort_expr_iter = ordering.iter().peekable(); + let mut existing_sort_expr_iter = existing_ordering.iter().peekable(); // walk in parallel down the two orderings, trying to match them up while sort_expr_iter.peek().is_some() || existing_sort_expr_iter.peek().is_some() @@ -2758,7 +2755,7 @@ mod tests { let leading_orderings = eq_properties .oeq_class() .iter() - .flat_map(|ordering| ordering.inner.first().cloned()) + .flat_map(|ordering| ordering.first().cloned()) .collect::>(); let expr_props = eq_properties.get_expr_properties(Arc::clone(&expr)); let err_msg = format!( diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 22513144d481..bfb894069abf 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -456,7 +456,7 @@ fn replace_on_columns_of_right_ordering( right_ordering: &mut LexOrdering, ) -> Result<()> { for (left_col, right_col) in on_columns { - for item in right_ordering.inner.iter_mut() { + right_ordering.transform(|item| { let new_expr = Arc::clone(&item.expr) .transform(|e| { if e.eq(right_col) { @@ -465,9 +465,10 @@ fn replace_on_columns_of_right_ordering( Ok(Transformed::no(e)) } }) - .data()?; + .data() + .expect("closure is infallible"); item.expr = new_expr; - } + }); } Ok(()) } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 5e8ee713703b..683fcc9c75e6 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -320,7 +320,7 @@ impl MemoryExec { let fields = self.schema.fields(); let ambiguous_column = sort_information .iter() - .flat_map(|ordering| ordering.inner.clone()) + .flat_map(|ordering| ordering.clone()) .flat_map(|expr| collect_columns(&expr.expr)) .find(|col| { fields @@ -660,8 +660,8 @@ mod memory_exec_tests { .try_with_sort_information(sort_information)?; assert_eq!( - mem_exec.properties().output_ordering().unwrap().to_vec(), - expected_output_order.inner + mem_exec.properties().output_ordering().unwrap(), + &expected_output_order ); let eq_properties = mem_exec.properties().equivalence_properties(); assert!(eq_properties.oeq_class().contains(&sort1)); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 33c8a2b2fee3..fd7e426a82c5 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -282,7 +282,7 @@ impl ExternalSorter { in_mem_batches: vec![], in_mem_batches_sorted: true, spills: vec![], - expr: expr.inner.into(), + expr: expr.into(), metrics, fetch, reservation, diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index ca154925dfdd..ed1df6b1b8ff 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -108,7 +108,7 @@ impl TopK { let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) .register(&runtime.memory_pool); - let expr: Arc<[PhysicalSortExpr]> = expr.inner.into(); + let expr: Arc<[PhysicalSortExpr]> = expr.into(); let sort_fields: Vec<_> = expr .iter() diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 48eba78d6169..4d8abcb0e654 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -331,15 +331,13 @@ fn roundtrip_window() -> Result<()> { let udwf_expr = Arc::new(StandardWindowExpr::new( nth_value_window, &[col("b", &schema)?], - &LexOrdering { - inner: vec![PhysicalSortExpr { - expr: col("a", &schema)?, - options: SortOptions { - descending: false, - nulls_first: false, - }, - }], - }, + &LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]), Arc::new(window_frame), )); @@ -1130,15 +1128,13 @@ fn roundtrip_udwf_extension_codec() -> Result<()> { let udwf_expr = Arc::new(StandardWindowExpr::new( udwf, &[col("b", &schema)?], - &LexOrdering { - inner: vec![PhysicalSortExpr { - expr: col("a", &schema)?, - options: SortOptions { - descending: false, - nulls_first: false, - }, - }], - }, + &LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]), Arc::new(window_frame), ));