-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
This ticket is to coordinate discussion and ideas for improving DataFusion join access path (aka algorithm) and join order selection.
DataFusion does not currently have a sophisticated cost based optimizer, but has support for the mechanics of join reordering and statistics. There seems to be broad desire for improve DataFusion for more advanced use cases (e.g. TPC-DS / distributed joins / ASOF joins). However as described below there are many tradeoffs involved with join access path and join order selection (e.g. #17432 recently) , so it is not likely any one implementation will be good for all users
Instead, I would like to discuss adding an API that users can use to customize the behavior to their use case.
Current State of DataFusion Optimizer:
Quoting from "Optimizing SQL (and DataFrames) in DataFusion, Part 2: Optimizers in Apache DataFusion"
DataFusion purposely does not include a sophisticated cost based optimizer. Instead, keeping with its design goals it provides a reasonable default implementation along with extension points to customize behavior.
Specifically, DataFusion includes
- “Syntactic Optimizer” (joins in the order they are listed in the query) with basic join re-ordering (source) to prevent join disasters.
- Support for ColumnStatistics and Table Statistics
- The framework for filter selectivity + join cardinality estimation.
- APIs for easily rewriting plans, such as the TreeNode API and reordering joins
This combination of features along with custom optimizer passes lets users customize the behavior to their use case, such as custom indexes like uWheel and materialized views.
The rationale for including only a basic optimizer is that any one particular set of heuristics and cost model is unlikely to work well for the wide variety of DataFusion users because of the tradeoffs involved.
For example, some users may always have access to adequate resources, and want the fastest query execution, and are willing to tolerate runtime errors or a performance cliff when there is insufficient memory. Other users, however, may be willing to accept a slower maximum performance in return for more predictable performance when running in a resource constrained environment. This approach is not universally agreed.
Usecases
Here are some use cases that have been discussed in various issues and PRs:
- Choose different join algorithms based on tradeoffs, such as memory usage vs CPU usage Add configuration to choose specific join implementation #17432
- Distributed join planning (e.g choosing between resegment or broadcast joins) where the join order might be different based on the distribution of the source data
- Join optimization research, such as join reordering based on different statistics/ cost models (e.g. optd from CMUDB) or different join algorithms
- Add new join implementations such as
ASOF
/ specialized temporal joins: ASOF join support / Specialize Range Joins #318
Describe the solution you'd like
I would like
- An API for join access path, algorithm, and join order selection that is flexible enough to support the use cases above, but can be extended /customized by users
- Documentation and examples of how to use the API
- A default implementation that works reasonably well (e.g. prevents join disasters in TPCH with basic statistics), and respects the same basic knobs we have today
Example of existing configuration settings
- repartition_joins
- allow_symmetric_joins_without_pruning
- prefer_hash_join
- hash_join_single_partition_threshold
- hash_join_single_partition_threshold_rows
Describe alternatives you've considered
One idea (from #17467 (review) a discussion with @2010YOUY01):
Maybe we could make JoinPlanner
trait that can be registered with the SessionContext
or the Optimizer the same way as ExtensionPlanners?. Then we can provide a default JoinPlanner (what currently exists) that has its own config namespace, etc
Something like this
trait JoinPlanner {
// plan the initial join when converting from Logical --> Physical join
fn plan_initial_join(
session_state: &SessionState,
physical_left: Arc<dyn ExecutionPlan>,
physical_right: Arc<dyn ExecutionPlan>,
join_on: join_utils::JoinOn,
join_filter: Option<join_utils::JoinFilter>,
join_type: &JoinType,
null_equality: &datafusion_common::NullEquality,) -> Arc<dyn ExecutionPlan>;
)
// TODO other APIs here
}
Another possibility is to introduce a JoinGraph
structure with the relevant APIs for walking and building plans
trait JoinPlanner {
// plan a specific join graph when converting from Logical --> Physical join
fn plan_join_graph(
session_state: &SessionState,
join_graph: JoinGraph,
)-> Result<Arc<dyn ExecutionPlan>;
}
Additional context
Related discussions