Skip to content

Encapsulate fields of OrderingEquivalenceClass (make field non pub) #14037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use arrow_schema::SortOptions;
/// ordering. In this case, we say that these orderings are equivalent.
#[derive(Debug, Clone, Eq, PartialEq, Hash, Default)]
pub struct OrderingEquivalenceClass {
pub orderings: Vec<LexOrdering>,
orderings: Vec<LexOrdering>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key change -- making this field non-pub -- the rest of the PR is derived from that and added comments

}

impl OrderingEquivalenceClass {
Expand All @@ -53,24 +53,33 @@ impl OrderingEquivalenceClass {
self.orderings.clear();
}

/// Creates new ordering equivalence class from the given orderings.
/// Creates new ordering equivalence class from the given orderings
///
/// Any redundant entries are removed
pub fn new(orderings: Vec<LexOrdering>) -> Self {
let mut result = Self { orderings };
result.remove_redundant_entries();
result
}

/// Converts this OrderingEquivalenceClass to a vector of orderings.
pub fn into_inner(self) -> Vec<LexOrdering> {
self.orderings
}

/// Checks whether `ordering` is a member of this equivalence class.
pub fn contains(&self, ordering: &LexOrdering) -> bool {
self.orderings.contains(ordering)
}

/// Adds `ordering` to this equivalence class.
#[allow(dead_code)]
#[deprecated(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note this method is only used in tests (and thus needed "allow(dead_code)"). It is the same as add_new_ordering so I made that explicit and deprecated the method and updated the tests.

since = "45.0.0",
note = "use OrderingEquivalenceClass::add_new_ordering instead"
)]
fn push(&mut self, ordering: LexOrdering) {
self.orderings.push(ordering);
// Make sure that there are no redundant orderings:
self.remove_redundant_entries();
self.add_new_ordering(ordering)
}

/// Checks whether this ordering equivalence class is empty.
Expand All @@ -79,6 +88,9 @@ impl OrderingEquivalenceClass {
}

/// Returns an iterator over the equivalent orderings in this class.
///
/// Note this class also implements [`IntoIterator`] to return an iterator
/// over owned [`LexOrdering`]s.
pub fn iter(&self) -> impl Iterator<Item = &LexOrdering> {
self.orderings.iter()
}
Expand All @@ -95,7 +107,7 @@ impl OrderingEquivalenceClass {
self.remove_redundant_entries();
}

/// Adds new orderings into this ordering equivalence class.
/// Adds new orderings into this ordering equivalence class
pub fn add_new_orderings(
&mut self,
orderings: impl IntoIterator<Item = LexOrdering>,
Expand All @@ -110,9 +122,10 @@ impl OrderingEquivalenceClass {
self.add_new_orderings([ordering]);
}

/// Removes redundant orderings from this equivalence class. For instance,
/// if we already have the ordering `[a ASC, b ASC, c DESC]`, then there is
/// no need to keep ordering `[a ASC, b ASC]` in the state.
/// Removes redundant orderings from this equivalence class.
///
/// For instance, if we already have the ordering `[a ASC, b ASC, c DESC]`,
/// then there is no need to keep ordering `[a ASC, b ASC]` in the state.
fn remove_redundant_entries(&mut self) {
let mut work = true;
while work {
Expand Down Expand Up @@ -198,6 +211,7 @@ impl OrderingEquivalenceClass {
}
}

/// Convert the `OrderingEquivalenceClass` into an iterator of LexOrderings
impl IntoIterator for OrderingEquivalenceClass {
type Item = LexOrdering;
type IntoIter = IntoIter<LexOrdering>;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/equivalence/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ mod tests {

let err_msg = format!(
"test_idx: {:?}, actual: {:?}, expected: {:?}, projection_mapping: {:?}",
idx, orderings.orderings, expected, projection_mapping
idx, orderings, expected, projection_mapping
);

assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
Expand Down Expand Up @@ -825,7 +825,7 @@ mod tests {

let err_msg = format!(
"test idx: {:?}, actual: {:?}, expected: {:?}, projection_mapping: {:?}",
idx, orderings.orderings, expected, projection_mapping
idx, orderings, expected, projection_mapping
);

assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
Expand Down Expand Up @@ -971,7 +971,7 @@ mod tests {

let err_msg = format!(
"actual: {:?}, expected: {:?}, projection_mapping: {:?}",
orderings.orderings, expected, projection_mapping
orderings, expected, projection_mapping
);

assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
Expand Down
92 changes: 44 additions & 48 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub struct EquivalenceProperties {
/// Expressions whose values are constant
///
/// TODO: We do not need to track constants separately, they can be tracked
/// inside `eq_groups` as `Literal` expressions.
/// inside `eq_group` as `Literal` expressions.
constants: Vec<ConstExpr>,
/// Schema associated with this object.
schema: SchemaRef,
Expand Down Expand Up @@ -168,6 +168,11 @@ impl EquivalenceProperties {
&self.oeq_class
}

/// Return the inner OrderingEquivalenceClass, consuming self
pub fn into_oeq_class(self) -> OrderingEquivalenceClass {
self.oeq_class
}

/// Returns a reference to the equivalence group within.
pub fn eq_group(&self) -> &EquivalenceGroup {
&self.eq_group
Expand Down Expand Up @@ -430,8 +435,8 @@ impl EquivalenceProperties {
let mut new_orderings = vec![filtered_exprs.clone()];

// Preserve valid suffixes from existing orderings
let orderings = mem::take(&mut self.oeq_class.orderings);
for existing in orderings {
let oeq_class = mem::take(&mut self.oeq_class);
for existing in oeq_class {
if self.is_prefix_of(&filtered_exprs, &existing) {
let mut extended = filtered_exprs.clone();
extended.extend(existing.into_iter().skip(filtered_exprs.len()));
Expand Down Expand Up @@ -710,8 +715,8 @@ impl EquivalenceProperties {
/// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct
/// dependency map, happen in issue 8838: <https://github.com/apache/datafusion/issues/8838>
pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> {
let orderings = &self.oeq_class.orderings;
let new_order = orderings
let new_order = self
.oeq_class
.iter()
.map(|order| self.substitute_ordering_component(mapping, order))
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -1219,7 +1224,7 @@ impl EquivalenceProperties {

// Rewrite orderings according to new schema:
let mut new_orderings = vec![];
for ordering in self.oeq_class.orderings {
for ordering in self.oeq_class {
let new_ordering = ordering
.inner
.into_iter()
Expand Down Expand Up @@ -2008,16 +2013,8 @@ fn calculate_union_binary(
// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = UnionEquivalentOrderingBuilder::new();
orderings.add_satisfied_orderings(
lhs.normalized_oeq_class().orderings,
lhs.constants(),
&rhs,
);
orderings.add_satisfied_orderings(
rhs.normalized_oeq_class().orderings,
rhs.constants(),
&lhs,
);
orderings.add_satisfied_orderings(lhs.normalized_oeq_class(), lhs.constants(), &rhs);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of the test changes I think actually make the code easier to read -- for example OrderingEquivalenceClass already implements IntoIterator so there is no need to explicitly name the inner field

orderings.add_satisfied_orderings(rhs.normalized_oeq_class(), rhs.constants(), &lhs);
let orderings = orderings.build();

let mut eq_properties =
Expand Down Expand Up @@ -2156,7 +2153,7 @@ impl UnionEquivalentOrderingBuilder {

// for each equivalent ordering in properties, try and augment
// `ordering` it with the constants to match
for existing_ordering in &properties.oeq_class.orderings {
for existing_ordering in properties.oeq_class.iter() {
if let Some(augmented_ordering) = self.augment_ordering(
ordering,
constants,
Expand Down Expand Up @@ -2437,17 +2434,12 @@ mod tests {
Some(JoinSide::Left),
&[],
);
let orderings = &join_eq.oeq_class.orderings;
let err_msg = format!("expected: {:?}, actual:{:?}", expected, orderings);
assert_eq!(
join_eq.oeq_class.orderings.len(),
expected.len(),
"{}",
err_msg
);
for ordering in orderings {
let err_msg =
format!("expected: {:?}, actual:{:?}", expected, &join_eq.oeq_class);
assert_eq!(join_eq.oeq_class.len(), expected.len(), "{}", err_msg);
for ordering in join_eq.oeq_class {
assert!(
expected.contains(ordering),
expected.contains(&ordering),
"{}, ordering: {:?}",
err_msg,
ordering
Expand Down Expand Up @@ -3766,8 +3758,8 @@ mod tests {

// Check whether orderings are same.
let lhs_orderings = lhs.oeq_class();
let rhs_orderings = &rhs.oeq_class.orderings;
for rhs_ordering in rhs_orderings {
let rhs_orderings = rhs.oeq_class();
for rhs_ordering in rhs_orderings.iter() {
assert!(
lhs_orderings.contains(rhs_ordering),
"{err_msg}\nlhs: {lhs}\nrhs: {rhs}"
Expand Down Expand Up @@ -3843,7 +3835,7 @@ mod tests {
// Add equality condition c = concat(a, b)
eq_properties.add_equal_conditions(&col_c, &a_concat_b)?;

let orderings = eq_properties.oeq_class().orderings.clone();
let orderings = eq_properties.oeq_class();

let expected_ordering1 =
LexOrdering::from(vec![
Expand Down Expand Up @@ -3894,7 +3886,7 @@ mod tests {
// Add equality condition c = a * b
eq_properties.add_equal_conditions(&col_c, &a_times_b)?;

let orderings = eq_properties.oeq_class().orderings.clone();
let orderings = eq_properties.oeq_class();

// The ordering should remain unchanged since multiplication is not lex-monotonic
assert_eq!(orderings.len(), 1);
Expand Down Expand Up @@ -3934,7 +3926,7 @@ mod tests {
// Add equality condition c = concat(a, b)
eq_properties.add_equal_conditions(&col_c, &a_concat_b)?;

let orderings = eq_properties.oeq_class().orderings.clone();
let orderings = eq_properties.oeq_class();

let expected_ordering1 = LexOrdering::from(vec![PhysicalSortExpr::new_default(
Arc::clone(&a_concat_b),
Expand Down Expand Up @@ -3978,8 +3970,9 @@ mod tests {

// Should only contain b since a is constant
assert_eq!(result.oeq_class().len(), 1);
assert_eq!(result.oeq_class().orderings[0].len(), 1);
assert!(result.oeq_class().orderings[0][0].expr.eq(&col_b));
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 1);
assert!(ordering[0].expr.eq(&col_b));

Ok(())
}
Expand Down Expand Up @@ -4025,13 +4018,14 @@ mod tests {

// Should only contain [a ASC, b DESC, c ASC]
assert_eq!(result.oeq_class().len(), 1);
assert_eq!(result.oeq_class().orderings[0].len(), 3);
assert!(result.oeq_class().orderings[0][0].expr.eq(&col_a));
assert!(result.oeq_class().orderings[0][0].options.eq(&asc));
assert!(result.oeq_class().orderings[0][1].expr.eq(&col_b));
assert!(result.oeq_class().orderings[0][1].options.eq(&desc));
assert!(result.oeq_class().orderings[0][2].expr.eq(&col_c));
assert!(result.oeq_class().orderings[0][2].options.eq(&asc));
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 3);
assert!(ordering[0].expr.eq(&col_a));
assert!(ordering[0].options.eq(&asc));
assert!(ordering[1].expr.eq(&col_b));
assert!(ordering[1].options.eq(&desc));
assert!(ordering[2].expr.eq(&col_c));
assert!(ordering[2].options.eq(&asc));

Ok(())
}
Expand Down Expand Up @@ -4074,11 +4068,12 @@ mod tests {
assert_eq!(result.oeq_class().len(), 1);

// Verify orderings
assert_eq!(result.oeq_class().orderings[0].len(), 2);
assert!(result.oeq_class().orderings[0][0].expr.eq(&col_b));
assert!(result.oeq_class().orderings[0][0].options.eq(&asc));
assert!(result.oeq_class().orderings[0][1].expr.eq(&col_c));
assert!(result.oeq_class().orderings[0][1].options.eq(&asc));
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 2);
assert!(ordering[0].expr.eq(&col_b));
assert!(ordering[0].options.eq(&asc));
assert!(ordering[1].expr.eq(&col_c));
assert!(ordering[1].options.eq(&asc));

Ok(())
}
Expand Down Expand Up @@ -4119,7 +4114,8 @@ mod tests {

// Should only contain the new ordering since options don't match
assert_eq!(result.oeq_class().len(), 1);
assert_eq!(result.oeq_class().orderings[0], new_order);
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering, &new_order);

Ok(())
}
Expand Down Expand Up @@ -4177,7 +4173,7 @@ mod tests {

// Should preserve the original [d ASC, a ASC] ordering
assert_eq!(result.oeq_class().len(), 1);
let ordering = &result.oeq_class().orderings[0];
let ordering = result.oeq_class().iter().next().unwrap();
assert_eq!(ordering.len(), 2);

// First expression should be either b or d (they're equivalent)
Expand Down
6 changes: 2 additions & 4 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,8 @@ impl MemoryExec {
ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
sort_information = base_eqp
.project(&projection_mapping, self.schema())
.oeq_class()
// TODO add a take / into to avoid the clone
.clone()
.orderings;
.into_oeq_class()
.into_inner();
}

self.sort_information = sort_information;
Expand Down
Loading