-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Push down preferred sorts into TableScan
logical plan node
#17337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Push down preferred sorts into TableScan
logical plan node
#17337
Conversation
TableScan
logical plan nodeTableScan
logical plan node
/// Optional preferred ordering for the scan | ||
pub preferred_ordering: Option<Vec<SortExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada do you think this is the right information to pass down? Or is there a world where it makes sense to pass down some sort of "equivalence" information?
cc @alamb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think @suremarc and @ozankabak may be interested in this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more future-proof API (that we could change the internal representation) might be something like
/// Preferred ordering
///
/// Preferred orderings can potentially help DataFusion optimize queries, even in cases
/// when the output does not completely follow that order. This is information passed
/// to the scan about what might help.
///
/// For example, a query with `ORDER BY time DESC LIMIT 10`, DataFusion's dynamic
/// predicates and TopK operator will work better if the data is roughly ordered by descending
/// time (more recent data first)
struct PreferredOrdering {
exprs: Vec<SortExpr>
}
And then change this API to
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<Vec<SortExpr>>, | |
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<PreferredOrdering>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada do you think this is the right information to pass down? Or is there a world where it makes sense to pass down some sort of "equivalence" information?
cc @alamb
When we are registering the sources, we can provide multiple orderings if the table supports them. However, the requirements are singular, and I don't think there would be any meaning in ordering the table for both col_a
and col_b
simultaneously. So, I've always thought that requirements need only one ordering, but specs should be capable of having multiple orderings. So there isn't any obvious advantage of using equivalences here, IMO
TableScan
logical plan nodeTableScan
logical plan node
🤖 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good but I was confused that the tests don't seem to show the preferred ordering. I think we should fix those tests before merging -- I also expect it to show that some of the pushdown isn't working quite as expected (aka pushing through a projection or filter)
I also recommend putting the prefered sort expressions in their own struct, but that is not required in my mind.
As I understand the plan, in the next few PRs, @adriangb will update the various APIs so that this preferred sort is provided to TableProvider::scan
(really via scan_with_args)
I also wonder if we should wait for the DataFusion 50 release before merging this or if it is ok to merge now.
/// Optional preferred ordering for the scan | ||
pub preferred_ordering: Option<Vec<SortExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more future-proof API (that we could change the internal representation) might be something like
/// Preferred ordering
///
/// Preferred orderings can potentially help DataFusion optimize queries, even in cases
/// when the output does not completely follow that order. This is information passed
/// to the scan about what might help.
///
/// For example, a query with `ORDER BY time DESC LIMIT 10`, DataFusion's dynamic
/// predicates and TopK operator will work better if the data is roughly ordered by descending
/// time (more recent data first)
struct PreferredOrdering {
exprs: Vec<SortExpr>
}
And then change this API to
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<Vec<SortExpr>>, | |
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<PreferredOrdering>, |
/// Currently, we only support pushing down simple column references | ||
/// because table providers typically can't optimize complex expressions | ||
/// in sort pushdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a fundamental limitation? I ask because @pepijnve was asking about "column only" support the other day at
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for context, we have externally prepared data files that contain filesystem paths. One column is the full parent path, another is the file name. The order of the rows in the file is replace(concat(parent, name), '/', chr(0))
and we make extensive use of this aspect of the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new implementation pushes down arbitrary sort expressions. Not only that: it's able to reverse projections out. E.g. given the projection a, b, c+d+1 as cd1
and the sort expression a, cd1
it will push down a, c+d+1
into the scan
Sort: t1.a ASC NULLS LAST | ||
Inner Join: t1.a = t2.a | ||
TableScan: t1 | ||
TableScan: t2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these tests don't really show that the preferred ordering is pushed through. Perhaps we can update the plan to show any preferred ordering
#[derive(Default, Debug)] | ||
pub struct PushDownSort {} | ||
|
||
impl PushDownSort { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the EnforceSorting
rule already pushes sorts down in the plan -- https://docs.rs/datafusion/latest/datafusion/physical_optimizer/enforce_sorting/struct.EnforceSorting.html
Do you think we will need more sort pushdown? Or will this always just be "pass down preferred sorts" to LogicalPlans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I was hoping another optimizer rule does the "hard work" so we can do just the simple thing here (only a subset of node types we need to support).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm that's at the physical optimizer layer though. We need to do this optimization ad the logical layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb do you have any guidance on pushing down sorts in logical plans? I don't see anything which is a bit surprising, I thought it would basically already be implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is all done in the physical plans
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay bummer. It seems to rely on methods on the traits, etc. I guess we need to figure this all out from scratch here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did implement it from scratch but luckily since we are just pushing down preferred ordering and not eliminating sorts altogether, and because this operates on LogicalPlan it is relatively simple and not a big deal to get wrong (slower queries, not incorrect results).
Here is a PR that avoids some clones, which might improve performance |
🤔 this seems to have caused a massive slowdown in the sql planner benchmark somehow: Benchmarking physical_sorted_union_order_by_300: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 3942.3s, or reduce sample count to 10.
physical_sorted_union_order_by_300
time: [38.914 s 38.997 s 39.079 s]
Benchmarking logical_plan_optimize: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 17189.6s, or reduce sample count to 10. It is still running... |
I'm sure it's just a dumb mistake on my end. Let me do a round of looking at your comments and investigating, thank you for your patience 🙏🏻 |
I think we should wait until after v50 |
c84cf55
to
39664ae
Compare
39664ae
to
724d476
Compare
@alamb sorry for never responding to your comments and the delay here. I realized that the approach was fundamentally flawed and needed some time to think and loop back. I've now rewritten it from scratch using the TreeNode API, added better tests, etc. I still need to do some cleanup (I see MSRV is failing, I want to update the PR description, go through your feedback and pull out the still relevant parts, etc.) but this is looking much better now. |
70b9d26
to
469e1af
Compare
Okay I've updated doc strings, the PR description, combed through the feedback... I think this is ready for review! |
/// A node context object beneficial for writing optimizer rules. | ||
/// This context encapsulates a [`LogicalPlan`] node with a payload. | ||
/// | ||
/// Since each wrapped node has its children within both the [`LogicalPlanContext.plan.inputs()`], | ||
/// as well as separately within the [`LogicalPlanContext.children`] (which are child nodes wrapped in the context), | ||
/// it's important to keep these child plans in sync when performing mutations. | ||
/// | ||
/// Since there are two ways to access child plans directly — it's recommended | ||
/// to perform mutable operations via [`Self::update_plan_from_children`]. | ||
/// After mutating the `LogicalPlanContext.children`, or after creating the `LogicalPlanContext`, | ||
/// call `update_plan_from_children` to sync. | ||
#[derive(Debug, Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to break this out into it's own PR
let mut replace_map = HashMap::new(); | ||
for (i, (qualifier, field)) in | ||
subquery_alias.input.schema().iter().enumerate() | ||
{ | ||
let (sub_qualifier, sub_field) = | ||
subquery_alias.schema.qualified_field(i); | ||
replace_map.insert( | ||
qualified_name(sub_qualifier, sub_field.name()), | ||
Expr::Column(Column::new(qualifier.cloned(), field.name())), | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unified into helper function in utils.rs
} | ||
|
||
/// replaces columns by its name on the projection. | ||
pub fn replace_cols_by_name( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to utils.rs
so we can re-use in sort pushdown
/// - Input: "test.a" (from input_schema) | ||
/// - Output: "subquery.a" (from output_schema) | ||
/// - Map: {"subquery.a" -> Column("test", "a")} | ||
pub(crate) fn build_schema_remapping( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoiding a public symbol until someone asks for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a new logical optimization rule called "PushDownSort" that propagates sort expressions down to TableScan nodes to enable sort-aware optimizations by table providers. The optimization pushes preferred ordering information through transparent nodes (like Projection, Filter, Limit) while preserving the original Sort nodes to ensure correctness.
Key changes include:
- Addition of a new
PushDownSort
optimizer rule that pushes sort expressions to table scans - Extension of
TableScan
nodes with optional preferred ordering information viaScanOrdering
- Updates to test expectations to reflect the new preferred_ordering display in table scans
Reviewed Changes
Copilot reviewed 34 out of 34 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
datafusion/optimizer/src/push_down_sort.rs | New optimizer rule implementing sort pushdown logic with expression rewriting |
datafusion/expr/src/logical_plan/plan.rs | Added ScanOrdering struct and integrated it into TableScan nodes |
datafusion/optimizer/src/optimizer.rs | Integrated PushDownSort rule into the default optimization pipeline |
datafusion/optimizer/src/utils.rs | Added utility functions for column name replacement and schema remapping |
datafusion/sqllogictest/test_files/*.slt | Updated test expectations to show preferred_ordering in TableScan display |
Comments suppressed due to low confidence (1)
datafusion/optimizer/src/push_down_sort.rs:1
- Corrected spelling of 'eliminiate' to 'eliminate'.
// Licensed to the Apache Software Foundation (ASF) under one
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
/// Ordering for the scan | ||
pub ordering: Option<ScanOrdering>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this just be ScanOrdering
instead of Option<ScanOrdering>
?
/// If the scan produces this exact ordering and sets it's properties to reflect this upstream sorts may be optimized away. | ||
/// Otherwise the sorts may remain in place but partial ordering may be exploited e.g. to do early stopping or reduce complexity of the sort. | ||
/// Thus it is recommended for the scan to also do a best effort to produce partially sorted data if possible. | ||
pub preferred_ordering: Option<Vec<SortExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be preferred_ordering: Vec<SortExpr>,
?
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, | ||
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, | ||
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, | ||
Projection, RecursiveQuery, Repartition, ScanOrdering, SkipType, Sort, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addition of the ScanOrdering
import results in reformatting of the rest
This commit adds a new optional field `preferred_ordering` to the `TableScan` logical plan node to support sort pushdown optimizations. Changes include: - Add `preferred_ordering: Option<Vec<SortExpr>>` field to `TableScan` struct - Add `try_new_with_preferred_ordering` constructor method - Update all `TableScan` constructors throughout the codebase to include the new field - Update `Debug`, `PartialEq`, `Hash`, and `PartialOrd` implementations - Update pattern matching in optimizer and other modules The preferred_ordering field is currently not used by any optimization rules but provides the foundation for future sort pushdown implementations. This is part 2 of 2 PRs split from apache#17273 as requested in apache#17273 (comment) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
0eba2fa
to
9eff254
Compare
I hope to review this more carefully later today |
TableProvider::scan_with_args
to support pushdown sorting #17273 (comment).This will enable TableProvider's to produce files in an order and partitioning that optimizes query execution, e.g. to make a TopK operator stop earlier via dynamic filters or to completely optimize away a sort if the files can be ordered to do so. I think this will also unlock a lot of cool physical optimizer tricks (e.g. pick
SortMergeJoin(Scan[order_by=a], Scan[order_by=a])
instead ofSortMergeJoin(SortExec(HashJoinExec(Scan, Scan)))
for a query likeselect * from t1 join t2 using (a) order by a
).This does not actually remove sort nodes. That is still done only at the physical optimizer level as some logical operators interact differently with sorts depending on which physical implementation is chosen (e.g. HashJoin vs. SortMergeJoin). It is up to the TableProvider to produce an ExecutionPlan that has ordering properties.