Skip to content

Commit a769025

Browse files
alambberkaysynnada
andauthored
Refactor DependencyMap and Dependencies into structs (#12761)
* Improve documentation, make DependencyMap / Dependencies a real struct + fix stack overflow * Update datafusion/physical-expr/src/equivalence/properties.rs Co-authored-by: Berkay Şahin <[email protected]> --------- Co-authored-by: Berkay Şahin <[email protected]>
1 parent 6d61503 commit a769025

File tree

1 file changed

+179
-22
lines changed

1 file changed

+179
-22
lines changed

datafusion/physical-expr/src/equivalence/properties.rs

Lines changed: 179 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::fmt;
1819
use std::fmt::Display;
1920
use std::hash::{Hash, Hasher};
2021
use std::iter::Peekable;
@@ -709,7 +710,7 @@ impl EquivalenceProperties {
709710
/// c ASC: Node {None, HashSet{a ASC}}
710711
/// ```
711712
fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
712-
let mut dependency_map = IndexMap::new();
713+
let mut dependency_map = DependencyMap::new();
713714
for ordering in self.normalized_oeq_class().iter() {
714715
for (idx, sort_expr) in ordering.iter().enumerate() {
715716
let target_sort_expr =
@@ -731,13 +732,11 @@ impl EquivalenceProperties {
731732
let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
732733
// Add sort expressions that can be projected or referred to
733734
// by any of the projection expressions to the dependency map:
734-
dependency_map
735-
.entry(sort_expr.clone())
736-
.or_insert_with(|| DependencyNode {
737-
target_sort_expr: target_sort_expr.clone(),
738-
dependencies: IndexSet::new(),
739-
})
740-
.insert_dependency(dependency);
735+
dependency_map.insert(
736+
sort_expr,
737+
target_sort_expr.as_ref(),
738+
dependency,
739+
);
741740
}
742741
if !is_projected {
743742
// If we can not project, stop constructing the dependency
@@ -1257,7 +1256,7 @@ fn referred_dependencies(
12571256
// Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
12581257
let mut expr_to_sort_exprs = IndexMap::<ExprWrapper, Dependencies>::new();
12591258
for sort_expr in dependency_map
1260-
.keys()
1259+
.sort_exprs()
12611260
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
12621261
{
12631262
let key = ExprWrapper(Arc::clone(&sort_expr.expr));
@@ -1270,10 +1269,16 @@ fn referred_dependencies(
12701269
// Generate all valid dependencies for the source. For example, if the source
12711270
// is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get
12721271
// `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`.
1273-
expr_to_sort_exprs
1274-
.values()
1272+
let dependencies = expr_to_sort_exprs
1273+
.into_values()
1274+
.map(Dependencies::into_inner)
1275+
.collect::<Vec<_>>();
1276+
dependencies
1277+
.iter()
12751278
.multi_cartesian_product()
1276-
.map(|referred_deps| referred_deps.into_iter().cloned().collect())
1279+
.map(|referred_deps| {
1280+
Dependencies::new_from_iter(referred_deps.into_iter().cloned())
1281+
})
12771282
.collect()
12781283
}
12791284

@@ -1296,7 +1301,9 @@ fn construct_prefix_orderings(
12961301
dependency_map: &DependencyMap,
12971302
) -> Vec<LexOrdering> {
12981303
let mut dep_enumerator = DependencyEnumerator::new();
1299-
dependency_map[relevant_sort_expr]
1304+
dependency_map
1305+
.get(relevant_sort_expr)
1306+
.expect("no relevant sort expr found")
13001307
.dependencies
13011308
.iter()
13021309
.flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
@@ -1433,13 +1440,161 @@ impl DependencyNode {
14331440
}
14341441
}
14351442

1436-
// Using `IndexMap` and `IndexSet` makes sure to generate consistent results across different executions for the same query.
1437-
// We could have used `HashSet`, `HashMap` in place of them without any loss of functionality.
1438-
// As an example, if existing orderings are `[a ASC, b ASC]`, `[c ASC]` for output ordering
1439-
// both `[a ASC, b ASC, c ASC]` and `[c ASC, a ASC, b ASC]` are valid (e.g. concatenated version of the alternative orderings).
1440-
// When using `HashSet`, `HashMap` it is not guaranteed to generate consistent result, among the possible 2 results in the example above.
1441-
type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
1442-
type Dependencies = IndexSet<PhysicalSortExpr>;
1443+
impl Display for DependencyNode {
1444+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1445+
if let Some(target) = &self.target_sort_expr {
1446+
write!(f, "(target: {}, ", target)?;
1447+
} else {
1448+
write!(f, "(")?;
1449+
}
1450+
write!(f, "dependencies: [{}])", self.dependencies)
1451+
}
1452+
}
1453+
1454+
/// Maps an expression --> DependencyNode
1455+
///
1456+
/// # Debugging / deplaying `DependencyMap`
1457+
///
1458+
/// This structure implements `Display` to assist debugging. For example:
1459+
///
1460+
/// ```text
1461+
/// DependencyMap: {
1462+
/// a@0 ASC --> (target: a@0 ASC, dependencies: [[]])
1463+
/// b@1 ASC --> (target: b@1 ASC, dependencies: [[a@0 ASC, c@2 ASC]])
1464+
/// c@2 ASC --> (target: c@2 ASC, dependencies: [[b@1 ASC, a@0 ASC]])
1465+
/// d@3 ASC --> (target: d@3 ASC, dependencies: [[c@2 ASC, b@1 ASC]])
1466+
/// }
1467+
/// ```
1468+
///
1469+
/// # Note on IndexMap Rationale
1470+
///
1471+
/// Using `IndexMap` (which preserves insert order) to ensure consistent results
1472+
/// across different executions for the same query. We could have used
1473+
/// `HashSet`, `HashMap` in place of them without any loss of functionality.
1474+
///
1475+
/// As an example, if existing orderings are
1476+
/// 1. `[a ASC, b ASC]`
1477+
/// 2. `[c ASC]` for
1478+
///
1479+
/// Then both the following output orderings are valid
1480+
/// 1. `[a ASC, b ASC, c ASC]`
1481+
/// 2. `[c ASC, a ASC, b ASC]`
1482+
///
1483+
/// (this are both valid as they are concatenated versions of the alternative
1484+
/// orderings). When using `HashSet`, `HashMap` it is not guaranteed to generate
1485+
/// consistent result, among the possible 2 results in the example above.
1486+
#[derive(Debug)]
1487+
struct DependencyMap {
1488+
inner: IndexMap<PhysicalSortExpr, DependencyNode>,
1489+
}
1490+
1491+
impl DependencyMap {
1492+
fn new() -> Self {
1493+
Self {
1494+
inner: IndexMap::new(),
1495+
}
1496+
}
1497+
1498+
/// Insert a new dependency `sort_expr` --> `dependency` into the map.
1499+
///
1500+
/// If `target_sort_expr` is none, a new entry is created with empty dependencies.
1501+
fn insert(
1502+
&mut self,
1503+
sort_expr: &PhysicalSortExpr,
1504+
target_sort_expr: Option<&PhysicalSortExpr>,
1505+
dependency: Option<&PhysicalSortExpr>,
1506+
) {
1507+
self.inner
1508+
.entry(sort_expr.clone())
1509+
.or_insert_with(|| DependencyNode {
1510+
target_sort_expr: target_sort_expr.cloned(),
1511+
dependencies: Dependencies::new(),
1512+
})
1513+
.insert_dependency(dependency)
1514+
}
1515+
1516+
/// Iterator over (sort_expr, DependencyNode) pairs
1517+
fn iter(&self) -> impl Iterator<Item = (&PhysicalSortExpr, &DependencyNode)> {
1518+
self.inner.iter()
1519+
}
1520+
1521+
/// iterator over all sort exprs
1522+
fn sort_exprs(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
1523+
self.inner.keys()
1524+
}
1525+
1526+
/// Return the dependency node for the given sort expression, if any
1527+
fn get(&self, sort_expr: &PhysicalSortExpr) -> Option<&DependencyNode> {
1528+
self.inner.get(sort_expr)
1529+
}
1530+
}
1531+
1532+
impl Display for DependencyMap {
1533+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1534+
writeln!(f, "DependencyMap: {{")?;
1535+
for (sort_expr, node) in self.inner.iter() {
1536+
writeln!(f, " {sort_expr} --> {node}")?;
1537+
}
1538+
writeln!(f, "}}")
1539+
}
1540+
}
1541+
1542+
/// A list of sort expressions that can be calculated from a known set of
1543+
/// dependencies.
1544+
#[derive(Debug, Default, Clone, PartialEq, Eq)]
1545+
struct Dependencies {
1546+
inner: IndexSet<PhysicalSortExpr>,
1547+
}
1548+
1549+
impl Display for Dependencies {
1550+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1551+
write!(f, "[")?;
1552+
let mut iter = self.inner.iter();
1553+
if let Some(dep) = iter.next() {
1554+
write!(f, "{}", dep)?;
1555+
}
1556+
for dep in iter {
1557+
write!(f, ", {}", dep)?;
1558+
}
1559+
write!(f, "]")
1560+
}
1561+
}
1562+
1563+
impl Dependencies {
1564+
/// Create a new empty `Dependencies` instance.
1565+
fn new() -> Self {
1566+
Self {
1567+
inner: IndexSet::new(),
1568+
}
1569+
}
1570+
1571+
/// Create a new `Dependencies` from an iterator of `PhysicalSortExpr`.
1572+
fn new_from_iter(iter: impl IntoIterator<Item = PhysicalSortExpr>) -> Self {
1573+
Self {
1574+
inner: iter.into_iter().collect(),
1575+
}
1576+
}
1577+
1578+
/// Insert a new dependency into the set.
1579+
fn insert(&mut self, sort_expr: PhysicalSortExpr) {
1580+
self.inner.insert(sort_expr);
1581+
}
1582+
1583+
/// Iterator over dependencies in the set
1584+
fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> + Clone {
1585+
self.inner.iter()
1586+
}
1587+
1588+
/// Return the inner set of dependencies
1589+
fn into_inner(self) -> IndexSet<PhysicalSortExpr> {
1590+
self.inner
1591+
}
1592+
1593+
/// Returns true if there are no dependencies
1594+
fn is_empty(&self) -> bool {
1595+
self.inner.is_empty()
1596+
}
1597+
}
14431598

14441599
/// Contains a mapping of all dependencies we have processed for each sort expr
14451600
struct DependencyEnumerator<'a> {
@@ -1487,8 +1642,9 @@ impl<'a> DependencyEnumerator<'a> {
14871642
referred_sort_expr: &'a PhysicalSortExpr,
14881643
dependency_map: &'a DependencyMap,
14891644
) -> Vec<LexOrdering> {
1490-
// We are sure that `referred_sort_expr` is inside `dependency_map`.
1491-
let node = &dependency_map[referred_sort_expr];
1645+
let node = dependency_map
1646+
.get(referred_sort_expr)
1647+
.expect("`referred_sort_expr` should be inside `dependency_map`");
14921648
// Since we work on intermediate nodes, we are sure `val.target_sort_expr`
14931649
// exists.
14941650
let target_sort_expr = node.target_sort_expr.as_ref().unwrap();
@@ -1506,6 +1662,7 @@ impl<'a> DependencyEnumerator<'a> {
15061662
} else {
15071663
vec![]
15081664
};
1665+
15091666
for ordering in orderings.iter_mut() {
15101667
ordering.push(target_sort_expr.clone())
15111668
}

0 commit comments

Comments
 (0)