Skip to content

Commit 33cc03c

Browse files
committed
chore: The HepOptimizerPipeline reduces the cost of building HepOptimizer.
1 parent f9ead12 commit 33cc03c

File tree

12 files changed

+428
-400
lines changed

12 files changed

+428
-400
lines changed

src/db.rs

Lines changed: 100 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::function::numbers::Numbers;
2626
use crate::function::octet_length::OctetLength;
2727
use crate::function::upper::Upper;
2828
use crate::optimizer::heuristic::batch::HepBatchStrategy;
29-
use crate::optimizer::heuristic::optimizer::HepOptimizer;
29+
use crate::optimizer::heuristic::optimizer::HepOptimizerPipeline;
3030
use crate::optimizer::rule::implementation::ImplementationRuleImpl;
3131
use crate::optimizer::rule::normalization::NormalizationRuleImpl;
3232
use crate::parser::parse_sql;
@@ -148,18 +148,112 @@ impl DataBaseBuilder {
148148
meta_cache,
149149
table_cache,
150150
view_cache,
151+
optimizer_pipeline: default_optimizer_pipeline(),
151152
_p: Default::default(),
152153
}),
153154
})
154155
}
155156
}
156157

158+
fn default_optimizer_pipeline() -> HepOptimizerPipeline {
159+
HepOptimizerPipeline::builder()
160+
.before_batch(
161+
"Column Pruning".to_string(),
162+
HepBatchStrategy::once_topdown(),
163+
vec![NormalizationRuleImpl::ColumnPruning],
164+
)
165+
.before_batch(
166+
"Simplify Filter".to_string(),
167+
HepBatchStrategy::fix_point_topdown(10),
168+
vec![
169+
NormalizationRuleImpl::SimplifyFilter,
170+
NormalizationRuleImpl::ConstantCalculation,
171+
],
172+
)
173+
.before_batch(
174+
"Predicate Pushdown".to_string(),
175+
HepBatchStrategy::fix_point_topdown(10),
176+
vec![
177+
NormalizationRuleImpl::PushPredicateThroughJoin,
178+
NormalizationRuleImpl::PushJoinPredicateIntoScan,
179+
NormalizationRuleImpl::PushPredicateIntoScan,
180+
],
181+
)
182+
.before_batch(
183+
"Limit Pushdown".to_string(),
184+
HepBatchStrategy::fix_point_topdown(10),
185+
vec![
186+
NormalizationRuleImpl::LimitProjectTranspose,
187+
NormalizationRuleImpl::PushLimitThroughJoin,
188+
NormalizationRuleImpl::PushLimitIntoTableScan,
189+
],
190+
)
191+
.before_batch(
192+
"Combine Operators".to_string(),
193+
HepBatchStrategy::fix_point_topdown(10),
194+
vec![
195+
NormalizationRuleImpl::CollapseProject,
196+
NormalizationRuleImpl::CollapseGroupByAgg,
197+
NormalizationRuleImpl::CombineFilter,
198+
],
199+
)
200+
.before_batch(
201+
"TopK".to_string(),
202+
HepBatchStrategy::once_topdown(),
203+
vec![NormalizationRuleImpl::TopK],
204+
)
205+
.after_batch(
206+
"Eliminate Redundant Sort".to_string(),
207+
HepBatchStrategy::once_topdown(),
208+
vec![NormalizationRuleImpl::EliminateRedundantSort],
209+
)
210+
.after_batch(
211+
"Expression Remapper".to_string(),
212+
HepBatchStrategy::once_topdown(),
213+
vec![
214+
NormalizationRuleImpl::BindExpressionPosition,
215+
NormalizationRuleImpl::EvaluatorBind,
216+
],
217+
)
218+
.implementations(vec![
219+
// DQL
220+
ImplementationRuleImpl::SimpleAggregate,
221+
ImplementationRuleImpl::GroupByAggregate,
222+
ImplementationRuleImpl::Dummy,
223+
ImplementationRuleImpl::Filter,
224+
ImplementationRuleImpl::HashJoin,
225+
ImplementationRuleImpl::Limit,
226+
ImplementationRuleImpl::Projection,
227+
ImplementationRuleImpl::SeqScan,
228+
ImplementationRuleImpl::IndexScan,
229+
ImplementationRuleImpl::FunctionScan,
230+
ImplementationRuleImpl::Sort,
231+
ImplementationRuleImpl::TopK,
232+
ImplementationRuleImpl::Values,
233+
// DML
234+
ImplementationRuleImpl::Analyze,
235+
ImplementationRuleImpl::CopyFromFile,
236+
ImplementationRuleImpl::CopyToFile,
237+
ImplementationRuleImpl::Delete,
238+
ImplementationRuleImpl::Insert,
239+
ImplementationRuleImpl::Update,
240+
// DLL
241+
ImplementationRuleImpl::AddColumn,
242+
ImplementationRuleImpl::CreateTable,
243+
ImplementationRuleImpl::DropColumn,
244+
ImplementationRuleImpl::DropTable,
245+
ImplementationRuleImpl::Truncate,
246+
])
247+
.build()
248+
}
249+
157250
pub(crate) struct State<S> {
158251
scala_functions: ScalaFunctions,
159252
table_functions: TableFunctions,
160253
meta_cache: StatisticsMetaCache,
161254
table_cache: TableCache,
162255
view_cache: ViewCache,
256+
optimizer_pipeline: HepOptimizerPipeline,
163257
_p: PhantomData<S>,
164258
}
165259

@@ -182,6 +276,7 @@ impl<S: Storage> State<S> {
182276

183277
#[allow(clippy::too_many_arguments)]
184278
pub(crate) fn build_plan<A: AsRef<[(&'static str, DataValue)]>>(
279+
&self,
185280
stmt: &Statement,
186281
params: A,
187282
table_cache: &TableCache,
@@ -211,104 +306,14 @@ impl<S: Storage> State<S> {
211306
/// Limit(1)
212307
/// Project(a,b)
213308
let source_plan = binder.bind(stmt)?;
214-
let best_plan = Self::default_optimizer(source_plan)
309+
let best_plan = self
310+
.optimizer_pipeline
311+
.instantiate(source_plan)
215312
.find_best(Some(&transaction.meta_loader(meta_cache)))?;
216313

217314
Ok(best_plan)
218315
}
219316

220-
pub(crate) fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer {
221-
HepOptimizer::new(source_plan)
222-
.before_batch(
223-
"Column Pruning".to_string(),
224-
HepBatchStrategy::once_topdown(),
225-
vec![NormalizationRuleImpl::ColumnPruning],
226-
)
227-
.before_batch(
228-
"Simplify Filter".to_string(),
229-
HepBatchStrategy::fix_point_topdown(10),
230-
vec![
231-
NormalizationRuleImpl::SimplifyFilter,
232-
NormalizationRuleImpl::ConstantCalculation,
233-
],
234-
)
235-
.before_batch(
236-
"Predicate Pushdown".to_string(),
237-
HepBatchStrategy::fix_point_topdown(10),
238-
vec![
239-
NormalizationRuleImpl::PushPredicateThroughJoin,
240-
NormalizationRuleImpl::PushJoinPredicateIntoScan,
241-
NormalizationRuleImpl::PushPredicateIntoScan,
242-
],
243-
)
244-
.before_batch(
245-
"Limit Pushdown".to_string(),
246-
HepBatchStrategy::fix_point_topdown(10),
247-
vec![
248-
NormalizationRuleImpl::LimitProjectTranspose,
249-
NormalizationRuleImpl::PushLimitThroughJoin,
250-
NormalizationRuleImpl::PushLimitIntoTableScan,
251-
],
252-
)
253-
.before_batch(
254-
"Combine Operators".to_string(),
255-
HepBatchStrategy::fix_point_topdown(10),
256-
vec![
257-
NormalizationRuleImpl::CollapseProject,
258-
NormalizationRuleImpl::CollapseGroupByAgg,
259-
NormalizationRuleImpl::CombineFilter,
260-
],
261-
)
262-
.before_batch(
263-
"TopK".to_string(),
264-
HepBatchStrategy::once_topdown(),
265-
vec![NormalizationRuleImpl::TopK],
266-
)
267-
.after_batch(
268-
"Eliminate Redundant Sort".to_string(),
269-
HepBatchStrategy::once_topdown(),
270-
vec![NormalizationRuleImpl::EliminateRedundantSort],
271-
)
272-
.after_batch(
273-
"Expression Remapper".to_string(),
274-
HepBatchStrategy::once_topdown(),
275-
vec![
276-
NormalizationRuleImpl::BindExpressionPosition,
277-
// TIPS: This rule is necessary
278-
NormalizationRuleImpl::EvaluatorBind,
279-
],
280-
)
281-
.implementations(vec![
282-
// DQL
283-
ImplementationRuleImpl::SimpleAggregate,
284-
ImplementationRuleImpl::GroupByAggregate,
285-
ImplementationRuleImpl::Dummy,
286-
ImplementationRuleImpl::Filter,
287-
ImplementationRuleImpl::HashJoin,
288-
ImplementationRuleImpl::Limit,
289-
ImplementationRuleImpl::Projection,
290-
ImplementationRuleImpl::SeqScan,
291-
ImplementationRuleImpl::IndexScan,
292-
ImplementationRuleImpl::FunctionScan,
293-
ImplementationRuleImpl::Sort,
294-
ImplementationRuleImpl::TopK,
295-
ImplementationRuleImpl::Values,
296-
// DML
297-
ImplementationRuleImpl::Analyze,
298-
ImplementationRuleImpl::CopyFromFile,
299-
ImplementationRuleImpl::CopyToFile,
300-
ImplementationRuleImpl::Delete,
301-
ImplementationRuleImpl::Insert,
302-
ImplementationRuleImpl::Update,
303-
// DLL
304-
ImplementationRuleImpl::AddColumn,
305-
ImplementationRuleImpl::CreateTable,
306-
ImplementationRuleImpl::DropColumn,
307-
ImplementationRuleImpl::DropTable,
308-
ImplementationRuleImpl::Truncate,
309-
])
310-
}
311-
312317
fn prepare<T: AsRef<str>>(&self, sql: T) -> Result<Statement, DatabaseError> {
313318
let mut stmts = parse_sql(sql)?;
314319
stmts.pop().ok_or(DatabaseError::EmptyStatement)
@@ -320,7 +325,7 @@ impl<S: Storage> State<S> {
320325
stmt: &Statement,
321326
params: A,
322327
) -> Result<(SchemaRef, Executor<'a>), DatabaseError> {
323-
let mut plan = Self::build_plan(
328+
let mut plan = self.build_plan(
324329
stmt,
325330
params,
326331
self.table_cache(),

src/execution/dql/aggregate/hash_agg.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ mod test {
131131
use crate::expression::agg::AggKind;
132132
use crate::expression::ScalarExpression;
133133
use crate::optimizer::heuristic::batch::HepBatchStrategy;
134-
use crate::optimizer::heuristic::optimizer::HepOptimizer;
134+
use crate::optimizer::heuristic::optimizer::HepOptimizerPipeline;
135135
use crate::optimizer::rule::normalization::NormalizationRuleImpl;
136136
use crate::planner::operator::aggregate::AggregateOperator;
137137
use crate::planner::operator::values::ValuesOperator;
@@ -208,7 +208,7 @@ mod test {
208208
Childrens::Only(Box::new(input)),
209209
);
210210

211-
let plan = HepOptimizer::new(plan)
211+
let pipeline = HepOptimizerPipeline::builder()
212212
.before_batch(
213213
"Expression Remapper".to_string(),
214214
HepBatchStrategy::once_topdown(),
@@ -218,6 +218,9 @@ mod test {
218218
NormalizationRuleImpl::EvaluatorBind,
219219
],
220220
)
221+
.build();
222+
let plan = pipeline
223+
.instantiate(plan)
221224
.find_best::<RocksTransaction>(None)?;
222225

223226
let Operator::Aggregate(op) = plan.operator else {

src/execution/dql/join/hash_join.rs

Lines changed: 21 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ mod test {
275275
use crate::execution::{try_collect, ReadExecutor};
276276
use crate::expression::{BinaryOperator, ScalarExpression};
277277
use crate::optimizer::heuristic::batch::HepBatchStrategy;
278-
use crate::optimizer::heuristic::optimizer::HepOptimizer;
278+
use crate::optimizer::heuristic::optimizer::HepOptimizerPipeline;
279279
use crate::optimizer::rule::normalization::NormalizationRuleImpl;
280280
use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType};
281281
use crate::planner::operator::values::ValuesOperator;
@@ -292,6 +292,21 @@ mod test {
292292
use std::sync::Arc;
293293
use tempfile::TempDir;
294294

295+
fn optimize_exprs(plan: LogicalPlan) -> Result<LogicalPlan, DatabaseError> {
296+
HepOptimizerPipeline::builder()
297+
.before_batch(
298+
"Expression Remapper".to_string(),
299+
HepBatchStrategy::once_topdown(),
300+
vec![
301+
NormalizationRuleImpl::BindExpressionPosition,
302+
NormalizationRuleImpl::EvaluatorBind,
303+
],
304+
)
305+
.build()
306+
.instantiate(plan)
307+
.find_best::<RocksTransaction>(None)
308+
}
309+
295310
fn build_join_values() -> (
296311
Vec<(ScalarExpression, ScalarExpression)>,
297312
LogicalPlan,
@@ -399,17 +414,7 @@ mod test {
399414
right: Box::new(right),
400415
},
401416
);
402-
let plan = HepOptimizer::new(plan)
403-
.before_batch(
404-
"Expression Remapper".to_string(),
405-
HepBatchStrategy::once_topdown(),
406-
vec![
407-
NormalizationRuleImpl::BindExpressionPosition,
408-
// TIPS: This rule is necessary
409-
NormalizationRuleImpl::EvaluatorBind,
410-
],
411-
)
412-
.find_best::<RocksTransaction>(None)?;
417+
let plan = optimize_exprs(plan)?;
413418

414419
let Operator::Join(op) = plan.operator else {
415420
unreachable!()
@@ -460,17 +465,7 @@ mod test {
460465
right: Box::new(right),
461466
},
462467
);
463-
let plan = HepOptimizer::new(plan)
464-
.before_batch(
465-
"Expression Remapper".to_string(),
466-
HepBatchStrategy::once_topdown(),
467-
vec![
468-
NormalizationRuleImpl::BindExpressionPosition,
469-
// TIPS: This rule is necessary
470-
NormalizationRuleImpl::EvaluatorBind,
471-
],
472-
)
473-
.find_best::<RocksTransaction>(None)?;
468+
let plan = optimize_exprs(plan)?;
474469

475470
let Operator::Join(op) = plan.operator else {
476471
unreachable!()
@@ -568,17 +563,7 @@ mod test {
568563
right: Box::new(right),
569564
},
570565
);
571-
let plan = HepOptimizer::new(plan)
572-
.before_batch(
573-
"Expression Remapper".to_string(),
574-
HepBatchStrategy::once_topdown(),
575-
vec![
576-
NormalizationRuleImpl::BindExpressionPosition,
577-
// TIPS: This rule is necessary
578-
NormalizationRuleImpl::EvaluatorBind,
579-
],
580-
)
581-
.find_best::<RocksTransaction>(None)?;
566+
let plan = optimize_exprs(plan)?;
582567

583568
let Operator::Join(op) = plan.operator else {
584569
unreachable!()
@@ -678,16 +663,7 @@ mod test {
678663
},
679664
);
680665

681-
let plan = HepOptimizer::new(plan)
682-
.before_batch(
683-
"Expression Remapper".to_string(),
684-
HepBatchStrategy::once_topdown(),
685-
vec![
686-
NormalizationRuleImpl::BindExpressionPosition,
687-
NormalizationRuleImpl::EvaluatorBind,
688-
],
689-
)
690-
.find_best::<RocksTransaction>(None)?;
666+
let plan = optimize_exprs(plan)?;
691667

692668
let Operator::Join(op) = plan.operator else {
693669
unreachable!()
@@ -733,17 +709,7 @@ mod test {
733709
right: Box::new(right),
734710
},
735711
);
736-
let plan = HepOptimizer::new(plan)
737-
.before_batch(
738-
"Expression Remapper".to_string(),
739-
HepBatchStrategy::once_topdown(),
740-
vec![
741-
NormalizationRuleImpl::BindExpressionPosition,
742-
// TIPS: This rule is necessary
743-
NormalizationRuleImpl::EvaluatorBind,
744-
],
745-
)
746-
.find_best::<RocksTransaction>(None)?;
712+
let plan = optimize_exprs(plan)?;
747713

748714
let Operator::Join(op) = plan.operator else {
749715
unreachable!()

0 commit comments

Comments
 (0)