Skip to content

Commit 0e22172

Browse files
authored
Make LexOrdering::inner non pub, add comments, update usages (#14155)
1 parent 0522272 commit 0e22172

File tree

15 files changed

+94
-66
lines changed

15 files changed

+94
-66
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,15 +1314,15 @@ mod tests {
13141314
// ok with one column
13151315
(
13161316
vec![vec![col("string_col").sort(true, false)]],
1317-
Ok(vec![LexOrdering {
1318-
inner: vec![PhysicalSortExpr {
1317+
Ok(vec![LexOrdering::new(
1318+
vec![PhysicalSortExpr {
13191319
expr: physical_col("string_col", &schema).unwrap(),
13201320
options: SortOptions {
13211321
descending: false,
13221322
nulls_first: false,
13231323
},
13241324
}],
1325-
}
1325+
)
13261326
])
13271327
),
13281328
// ok with two columns, different options
@@ -1331,16 +1331,16 @@ mod tests {
13311331
col("string_col").sort(true, false),
13321332
col("int_col").sort(false, true),
13331333
]],
1334-
Ok(vec![LexOrdering {
1335-
inner: vec![
1334+
Ok(vec![LexOrdering::new(
1335+
vec![
13361336
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
13371337
.asc()
13381338
.nulls_last(),
13391339
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
13401340
.desc()
13411341
.nulls_first()
13421342
],
1343-
}
1343+
)
13441344
])
13451345
),
13461346
];

datafusion/core/src/datasource/physical_plan/file_scan_config.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,9 +1134,8 @@ mod tests {
11341134
))))
11351135
.collect::<Vec<_>>(),
11361136
));
1137-
let sort_order = LexOrdering {
1138-
inner: case
1139-
.sort
1137+
let sort_order = LexOrdering::from(
1138+
case.sort
11401139
.into_iter()
11411140
.map(|expr| {
11421141
crate::physical_planner::create_physical_sort_expr(
@@ -1146,7 +1145,7 @@ mod tests {
11461145
)
11471146
})
11481147
.collect::<Result<Vec<_>>>()?,
1149-
};
1148+
);
11501149

11511150
let partitioned_files =
11521151
case.files.into_iter().map(From::from).collect::<Vec<_>>();

datafusion/core/src/datasource/physical_plan/statistics.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ impl MinMaxStatistics {
119119
projected_schema
120120
.project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
121121
);
122-
let min_max_sort_order = LexOrdering {
123-
inner: sort_columns
122+
let min_max_sort_order = LexOrdering::from(
123+
sort_columns
124124
.iter()
125125
.zip(projected_sort_order.iter())
126126
.enumerate()
@@ -129,7 +129,7 @@ impl MinMaxStatistics {
129129
options: sort.options,
130130
})
131131
.collect::<Vec<_>>(),
132-
};
132+
);
133133

134134
let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
135135
.iter()

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,7 @@ fn plan_with_order_preserving_variants(
133133
if let Some(ordering) = child.output_ordering() {
134134
// When the input of a `CoalescePartitionsExec` has an ordering,
135135
// replace it with a `SortPreservingMergeExec` if appropriate:
136-
let spm = SortPreservingMergeExec::new(
137-
LexOrdering::new(ordering.inner.clone()),
138-
Arc::clone(child),
139-
);
136+
let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child));
140137
sort_input.plan = Arc::new(spm) as _;
141138
sort_input.children[0].data = true;
142139
return Ok(sort_input);

datafusion/core/src/physical_optimizer/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub fn add_sort_above<T: Clone + Default>(
4040
fetch: Option<usize>,
4141
) -> PlanContext<T> {
4242
let mut sort_expr = LexOrdering::from(sort_requirements);
43-
sort_expr.inner.retain(|sort_expr| {
43+
sort_expr.retain(|sort_expr| {
4444
!node
4545
.plan
4646
.equivalence_properties()

datafusion/core/tests/fuzz_cases/equivalence/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ pub fn generate_table_for_orderings(
444444

445445
assert!(!orderings.is_empty());
446446
// Sort the inner vectors by their lengths (longest first)
447-
orderings.sort_by_key(|v| std::cmp::Reverse(v.inner.len()));
447+
orderings.sort_by_key(|v| std::cmp::Reverse(v.len()));
448448

449449
let arrays = schema
450450
.fields

datafusion/core/tests/fuzz_cases/window_fuzz.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ async fn run_window_test(
617617
options: SortOptions::default(),
618618
})
619619
}
620-
for order_by_expr in &orderby_exprs.inner {
620+
for order_by_expr in &orderby_exprs {
621621
if !sort_keys.contains(order_by_expr) {
622622
sort_keys.push(order_by_expr.clone())
623623
}

datafusion/physical-expr-common/src/sort_expr.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -325,9 +325,13 @@ fn to_str(options: &SortOptions) -> &str {
325325

326326
///`LexOrdering` contains a `Vec<PhysicalSortExpr>`, which represents
327327
/// a lexicographical ordering.
328+
///
329+
/// For example, `vec![a ASC, b DESC]` represents a lexicographical ordering
330+
/// that first sorts by column `a` in ascending order, then by column `b` in
331+
/// descending order.
328332
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
329333
pub struct LexOrdering {
330-
pub inner: Vec<PhysicalSortExpr>,
334+
inner: Vec<PhysicalSortExpr>,
331335
}
332336

333337
impl AsRef<LexOrdering> for LexOrdering {
@@ -337,7 +341,7 @@ impl AsRef<LexOrdering> for LexOrdering {
337341
}
338342

339343
impl LexOrdering {
340-
// Creates a new [`LexOrdering`] from a vector
344+
/// Creates a new [`LexOrdering`] from a vector
341345
pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
342346
Self { inner }
343347
}
@@ -348,46 +352,61 @@ impl LexOrdering {
348352
&EMPTY_ORDER
349353
}
350354

355+
/// Returns the number of elements that can be stored in the LexOrdering
356+
/// without reallocating.
351357
pub fn capacity(&self) -> usize {
352358
self.inner.capacity()
353359
}
354360

361+
/// Clears the LexOrdering, removing all elements.
355362
pub fn clear(&mut self) {
356363
self.inner.clear()
357364
}
358365

366+
/// Returns `true` if the LexOrdering contains `expr`
359367
pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
360368
self.inner.contains(expr)
361369
}
362370

371+
/// Add all elements from `iter` to the LexOrdering.
363372
pub fn extend<I: IntoIterator<Item = PhysicalSortExpr>>(&mut self, iter: I) {
364373
self.inner.extend(iter)
365374
}
366375

376+
/// Remove all elements from the LexOrdering where `f` evaluates to `false`.
377+
pub fn retain<F>(&mut self, f: F)
378+
where
379+
F: FnMut(&PhysicalSortExpr) -> bool,
380+
{
381+
self.inner.retain(f)
382+
}
383+
384+
/// Returns `true` if the LexOrdering contains no elements.
367385
pub fn is_empty(&self) -> bool {
368386
self.inner.is_empty()
369387
}
370388

371-
pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
389+
/// Returns an iterator over each `&PhysicalSortExpr` in the LexOrdering.
390+
pub fn iter(&self) -> core::slice::Iter<PhysicalSortExpr> {
372391
self.inner.iter()
373392
}
374393

394+
/// Returns the number of elements in the LexOrdering.
375395
pub fn len(&self) -> usize {
376396
self.inner.len()
377397
}
378398

399+
/// Removes the last element from the LexOrdering and returns it, or `None` if it is empty.
379400
pub fn pop(&mut self) -> Option<PhysicalSortExpr> {
380401
self.inner.pop()
381402
}
382403

404+
/// Appends an element to the back of the LexOrdering.
383405
pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) {
384406
self.inner.push(physical_sort_expr)
385407
}
386408

387-
pub fn retain(&mut self, f: impl FnMut(&PhysicalSortExpr) -> bool) {
388-
self.inner.retain(f)
389-
}
390-
409+
/// Truncates the LexOrdering, keeping only the first `len` elements.
391410
pub fn truncate(&mut self, len: usize) {
392411
self.inner.truncate(len)
393412
}
@@ -400,9 +419,12 @@ impl LexOrdering {
400419

401420
/// Converts a `LexRequirement` into a `LexOrdering`.
402421
///
403-
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
404-
/// for each entry in the input. If required ordering is None for an entry
405-
/// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`).
422+
/// This function converts [`PhysicalSortRequirement`] to [`PhysicalSortExpr`]
423+
/// for each entry in the input.
424+
///
425+
/// If the required ordering is `None` for an entry in `requirement`, the
426+
/// default ordering `ASC, NULLS LAST` is used (see
427+
/// [`PhysicalSortExpr::from`]).
406428
pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
407429
requirement
408430
.into_iter()
@@ -425,6 +447,15 @@ impl LexOrdering {
425447
}
426448
output
427449
}
450+
451+
/// Transforms each `PhysicalSortExpr` in the `LexOrdering`
452+
/// in place using the provided closure `f`.
453+
pub fn transform<F>(&mut self, f: F)
454+
where
455+
F: FnMut(&mut PhysicalSortExpr),
456+
{
457+
self.inner.iter_mut().for_each(f);
458+
}
428459
}
429460

430461
impl From<Vec<PhysicalSortExpr>> for LexOrdering {
@@ -439,6 +470,13 @@ impl From<LexRequirement> for LexOrdering {
439470
}
440471
}
441472

473+
/// Convert a `LexOrdering` into a `Arc[<PhysicalSortExpr>]` for fast copies
474+
impl From<LexOrdering> for Arc<[PhysicalSortExpr]> {
475+
fn from(value: LexOrdering) -> Self {
476+
value.inner.into()
477+
}
478+
}
479+
442480
impl Deref for LexOrdering {
443481
type Target = [PhysicalSortExpr];
444482

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ impl OrderingEquivalenceClass {
207207
for idx in 0..n_ordering {
208208
// Calculate cross product index
209209
let idx = outer_idx * n_ordering + idx;
210-
self.orderings[idx].inner.extend(ordering.iter().cloned());
210+
self.orderings[idx].extend(ordering.iter().cloned());
211211
}
212212
}
213213
self
@@ -217,9 +217,9 @@ impl OrderingEquivalenceClass {
217217
/// ordering equivalence class.
218218
pub fn add_offset(&mut self, offset: usize) {
219219
for ordering in self.orderings.iter_mut() {
220-
for sort_expr in ordering.inner.iter_mut() {
220+
ordering.transform(|sort_expr| {
221221
sort_expr.expr = add_offset_to_expr(Arc::clone(&sort_expr.expr), offset);
222-
}
222+
})
223223
}
224224
}
225225

datafusion/physical-expr/src/equivalence/properties.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ impl EquivalenceProperties {
203203
let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default();
204204
// Prune out constant expressions
205205
output_ordering
206-
.inner
207206
.retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr));
208207
(!output_ordering.is_empty()).then_some(output_ordering)
209208
}
@@ -795,7 +794,6 @@ impl EquivalenceProperties {
795794
// Generate all valid orderings, given substituted expressions.
796795
let res = new_orderings
797796
.into_iter()
798-
.map(|ordering| ordering.inner)
799797
.multi_cartesian_product()
800798
.map(LexOrdering::new)
801799
.collect::<Vec<_>>();
@@ -1344,7 +1342,6 @@ impl EquivalenceProperties {
13441342
let mut new_orderings = vec![];
13451343
for ordering in self.oeq_class {
13461344
let new_ordering = ordering
1347-
.inner
13481345
.into_iter()
13491346
.map(|mut sort_expr| {
13501347
sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
@@ -1630,7 +1627,7 @@ fn generate_dependency_orderings(
16301627
.map(|prefixes| {
16311628
prefixes
16321629
.into_iter()
1633-
.flat_map(|ordering| ordering.inner.clone())
1630+
.flat_map(|ordering| ordering.clone())
16341631
.collect()
16351632
})
16361633
.collect::<Vec<_>>()
@@ -2300,8 +2297,8 @@ impl UnionEquivalentOrderingBuilder {
23002297
existing_constants: &[ConstExpr],
23012298
) -> Option<LexOrdering> {
23022299
let mut augmented_ordering = LexOrdering::default();
2303-
let mut sort_expr_iter = ordering.inner.iter().peekable();
2304-
let mut existing_sort_expr_iter = existing_ordering.inner.iter().peekable();
2300+
let mut sort_expr_iter = ordering.iter().peekable();
2301+
let mut existing_sort_expr_iter = existing_ordering.iter().peekable();
23052302

23062303
// walk in parallel down the two orderings, trying to match them up
23072304
while sort_expr_iter.peek().is_some() || existing_sort_expr_iter.peek().is_some()
@@ -2881,7 +2878,7 @@ mod tests {
28812878
let leading_orderings = eq_properties
28822879
.oeq_class()
28832880
.iter()
2884-
.flat_map(|ordering| ordering.inner.first().cloned())
2881+
.flat_map(|ordering| ordering.first().cloned())
28852882
.collect::<Vec<_>>();
28862883
let expr_props = eq_properties.get_expr_properties(Arc::clone(&expr));
28872884
let err_msg = format!(

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ fn replace_on_columns_of_right_ordering(
456456
right_ordering: &mut LexOrdering,
457457
) -> Result<()> {
458458
for (left_col, right_col) in on_columns {
459-
for item in right_ordering.inner.iter_mut() {
459+
right_ordering.transform(|item| {
460460
let new_expr = Arc::clone(&item.expr)
461461
.transform(|e| {
462462
if e.eq(right_col) {
@@ -465,9 +465,10 @@ fn replace_on_columns_of_right_ordering(
465465
Ok(Transformed::no(e))
466466
}
467467
})
468-
.data()?;
468+
.data()
469+
.expect("closure is infallible");
469470
item.expr = new_expr;
470-
}
471+
});
471472
}
472473
Ok(())
473474
}

datafusion/physical-plan/src/memory.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ impl MemoryExec {
353353
let fields = self.schema.fields();
354354
let ambiguous_column = sort_information
355355
.iter()
356-
.flat_map(|ordering| ordering.inner.clone())
356+
.flat_map(|ordering| ordering.clone())
357357
.flat_map(|expr| collect_columns(&expr.expr))
358358
.find(|col| {
359359
fields
@@ -695,8 +695,8 @@ mod memory_exec_tests {
695695
.try_with_sort_information(sort_information)?;
696696

697697
assert_eq!(
698-
mem_exec.properties().output_ordering().unwrap().to_vec(),
699-
expected_output_order.inner
698+
mem_exec.properties().output_ordering().unwrap(),
699+
&expected_output_order
700700
);
701701
let eq_properties = mem_exec.properties().equivalence_properties();
702702
assert!(eq_properties.oeq_class().contains(&sort1));

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ impl ExternalSorter {
282282
in_mem_batches: vec![],
283283
in_mem_batches_sorted: true,
284284
spills: vec![],
285-
expr: expr.inner.into(),
285+
expr: expr.into(),
286286
metrics,
287287
fetch,
288288
reservation,

datafusion/physical-plan/src/topk/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl TopK {
108108
let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
109109
.register(&runtime.memory_pool);
110110

111-
let expr: Arc<[PhysicalSortExpr]> = expr.inner.into();
111+
let expr: Arc<[PhysicalSortExpr]> = expr.into();
112112

113113
let sort_fields: Vec<_> = expr
114114
.iter()

0 commit comments

Comments
 (0)