Skip to content

Commit e8ba45c

Browse files
authored
Basic support for IN and NOT IN Subqueries by rewriting them to SEMI / ANTI (#2421)
* naive in subquery implementation * 16 and 18 tpch queries enabled in benchmark * rollback rewriting instead of fail * try_fold used for input plan rewriting * test readability & negative test cases
1 parent 5569eea commit e8ba45c

File tree

6 files changed

+454
-51
lines changed

6 files changed

+454
-51
lines changed

benchmarks/src/bin/tpch.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,16 @@ mod tests {
10741074
run_query(14).await
10751075
}
10761076

1077+
#[tokio::test]
1078+
async fn run_q16() -> Result<()> {
1079+
run_query(16).await
1080+
}
1081+
1082+
#[tokio::test]
1083+
async fn run_q18() -> Result<()> {
1084+
run_query(18).await
1085+
}
1086+
10771087
#[tokio::test]
10781088
async fn run_q19() -> Result<()> {
10791089
run_query(19).await

datafusion/core/src/execution/context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use crate::optimizer::optimizer::OptimizerRule;
7272
use crate::optimizer::projection_push_down::ProjectionPushDown;
7373
use crate::optimizer::simplify_expressions::SimplifyExpressions;
7474
use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
75+
use crate::optimizer::subquery_filter_to_join::SubqueryFilterToJoin;
7576

7677
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
7778
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
@@ -1199,6 +1200,7 @@ impl SessionState {
11991200
// Simplify expressions first to maximize the chance
12001201
// of applying other optimizations
12011202
Arc::new(SimplifyExpressions::new()),
1203+
Arc::new(SubqueryFilterToJoin::new()),
12021204
Arc::new(EliminateFilter::new()),
12031205
Arc::new(CommonSubexprEliminate::new()),
12041206
Arc::new(EliminateLimit::new()),

datafusion/core/src/optimizer/filter_push_down.rs

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,17 @@
1414

1515
//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
1616
17+
use crate::error::Result;
1718
use crate::execution::context::ExecutionProps;
1819
use crate::logical_expr::TableProviderFilterPushDown;
1920
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
2021
use crate::logical_plan::{
21-
and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
22+
col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
2223
};
2324
use crate::logical_plan::{DFSchema, Expr};
2425
use crate::optimizer::optimizer::OptimizerRule;
2526
use crate::optimizer::utils;
26-
use crate::{error::Result, logical_plan::Operator};
27-
use std::{
28-
collections::{HashMap, HashSet},
29-
sync::Arc,
30-
};
27+
use std::collections::{HashMap, HashSet};
3128

3229
/// Filter Push Down optimizer rule pushes filter clauses down the plan
3330
/// # Introduction
@@ -95,23 +92,6 @@ fn push_down(state: &State, plan: &LogicalPlan) -> Result<LogicalPlan> {
9592
utils::from_plan(plan, &expr, &new_inputs)
9693
}
9794

98-
/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with
99-
/// its predicate be all `predicates` ANDed.
100-
fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
101-
// reduce filters to a single filter with an AND
102-
let predicate = predicates
103-
.iter()
104-
.skip(1)
105-
.fold(predicates[0].clone(), |acc, predicate| {
106-
and(acc, (*predicate).to_owned())
107-
});
108-
109-
LogicalPlan::Filter(Filter {
110-
predicate,
111-
input: Arc::new(plan),
112-
})
113-
}
114-
11595
// remove all filters from `filters` that are in `predicate_columns`
11696
fn remove_filters(
11797
filters: &[(Expr, HashSet<Column>)],
@@ -150,32 +130,14 @@ fn issue_filters(
150130
return push_down(&state, plan);
151131
}
152132

153-
let plan = add_filter(plan.clone(), &predicates);
133+
let plan = utils::add_filter(plan.clone(), &predicates);
154134

155135
state.filters = remove_filters(&state.filters, &predicate_columns);
156136

157137
// continue optimization over all input nodes by cloning the current state (i.e. each node is independent)
158138
push_down(&state, &plan)
159139
}
160140

161-
/// converts "A AND B AND C" => [A, B, C]
162-
fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
163-
match predicate {
164-
Expr::BinaryExpr {
165-
right,
166-
op: Operator::And,
167-
left,
168-
} => {
169-
split_members(left, predicates);
170-
split_members(right, predicates);
171-
}
172-
Expr::Alias(expr, _) => {
173-
split_members(expr, predicates);
174-
}
175-
other => predicates.push(other),
176-
}
177-
}
178-
179141
// For a given JOIN logical plan, determine whether each side of the join is preserved.
180142
// We say a join side is preserved if the join returns all or a subset of the rows from
181143
// the relevant side, such that each row of the output table directly maps to a row of
@@ -289,7 +251,7 @@ fn optimize_join(
289251
Ok(plan)
290252
} else {
291253
// wrap the join on the filter whose predicates must be kept
292-
let plan = add_filter(plan, &to_keep.0);
254+
let plan = utils::add_filter(plan, &to_keep.0);
293255
state.filters = remove_filters(&state.filters, &to_keep.1);
294256

295257
Ok(plan)
@@ -305,7 +267,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
305267
LogicalPlan::Analyze { .. } => push_down(&state, plan),
306268
LogicalPlan::Filter(Filter { input, predicate }) => {
307269
let mut predicates = vec![];
308-
split_members(predicate, &mut predicates);
270+
utils::split_conjunction(predicate, &mut predicates);
309271

310272
// Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.)
311273
let mut no_col_predicates = vec![];
@@ -328,7 +290,10 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
328290
// As those contain only literals, they could be optimized using constant folding
329291
// and removal of WHERE TRUE / WHERE FALSE
330292
if !no_col_predicates.is_empty() {
331-
Ok(add_filter(optimize(input, state)?, &no_col_predicates))
293+
Ok(utils::add_filter(
294+
optimize(input, state)?,
295+
&no_col_predicates,
296+
))
332297
} else {
333298
optimize(input, state)
334299
}
@@ -592,17 +557,18 @@ fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
592557

593558
#[cfg(test)]
594559
mod tests {
560+
use std::sync::Arc;
561+
595562
use super::*;
596563
use crate::datasource::TableProvider;
564+
use crate::logical_plan::plan::provider_as_source;
597565
use crate::logical_plan::{
598-
lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, Operator,
566+
and, col, lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder,
567+
Operator,
599568
};
600569
use crate::physical_plan::ExecutionPlan;
570+
use crate::prelude::JoinType;
601571
use crate::test::*;
602-
use crate::{
603-
logical_plan::{col, plan::provider_as_source},
604-
prelude::JoinType,
605-
};
606572

607573
use arrow::datatypes::SchemaRef;
608574
use async_trait::async_trait;

datafusion/core/src/optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ pub mod optimizer;
2828
pub mod projection_push_down;
2929
pub mod simplify_expressions;
3030
pub mod single_distinct_to_groupby;
31+
pub mod subquery_filter_to_join;
3132
pub mod utils;

0 commit comments

Comments
 (0)