Skip to content

Commit

Permalink
[Enhancement] Add force rule based mv rewrite (StarRocks#40664)
Browse files Browse the repository at this point in the history
Signed-off-by: ABingHuang <[email protected]>
  • Loading branch information
ABingHuang authored Feb 4, 2024
1 parent df1fb9b commit 28b477f
Show file tree
Hide file tree
Showing 17 changed files with 300 additions and 82 deletions.
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ public static MaterializedViewRewriteMode parse(String str) {
public static final String ENABLE_SYNC_MATERIALIZED_VIEW_REWRITE = "enable_sync_materialized_view_rewrite";
public static final String ENABLE_RULE_BASED_MATERIALIZED_VIEW_REWRITE =
"enable_rule_based_materialized_view_rewrite";
public static final String ENABLE_FORCE_RULE_BASED_MV_REWRITE =
"eable_force_rule_based_mv_rewrite";

public static final String ENABLE_MATERIALIZED_VIEW_VIEW_DELTA_REWRITE =
"enable_materialized_view_view_delta_rewrite";
Expand Down Expand Up @@ -1463,6 +1465,9 @@ public String getConnectorSinkCompressionCodec() {
@VarAttr(name = ENABLE_MATERIALIZED_VIEW_REWRITE_PARTITION_COMPENSATE, flag = VariableMgr.INVISIBLE)
private boolean enableMaterializedViewRewritePartitionCompensate = true;

@VarAttr(name = ENABLE_FORCE_RULE_BASED_MV_REWRITE)
private boolean enableForceRuleBasedMvRewrite = false;

@VarAttr(name = ENABLE_RULE_BASED_MATERIALIZED_VIEW_REWRITE)
private boolean enableRuleBasedMaterializedViewRewrite = true;

Expand Down Expand Up @@ -2851,6 +2856,14 @@ public void setEnableRuleBasedMaterializedViewRewrite(boolean enableRuleBasedMat
this.enableRuleBasedMaterializedViewRewrite = enableRuleBasedMaterializedViewRewrite;
}

public boolean isEnableForceRuleBasedMvRewrite() {
return enableForceRuleBasedMvRewrite;
}

public void setEnableForceRuleBasedMvRewrite(boolean enableForceRuleBasedMvRewrite) {
this.enableForceRuleBasedMvRewrite = enableForceRuleBasedMvRewrite;
}

public boolean isEnableMaterializedViewRewritePartitionCompensate() {
return enableMaterializedViewRewritePartitionCompensate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public MvPlanContext optimize(MaterializedView mv,
// optimize the sql by rule and disable rule based materialized view rewrite
OptimizerConfig optimizerConfig = new OptimizerConfig(OptimizerConfig.OptimizerAlgorithm.RULE_BASED);
optimizerConfig.disableRuleSet(RuleSetType.PARTITION_PRUNE);
optimizerConfig.disableRuleSet(RuleSetType.SINGLE_TABLE_MV_REWRITE);
optimizerConfig.disableRuleSet(RuleSetType.ALL_MV_REWRITE);
// INTERSECT_REWRITE is used for INTERSECT related plan optimize, which can not be SPJG;
// And INTERSECT_REWRITE should be based on PARTITION_PRUNE rule set.
// So exclude it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,10 @@ private OptExpression logicalRuleRewrite(
viewBasedMvRuleRewrite(tree, rootTaskContext);
}

if (isEnableSingleTableMVRewrite(rootTaskContext, sessionVariable, tree)) {
if (sessionVariable.isEnableMaterializedViewRewrite() && sessionVariable.isEnableForceRuleBasedMvRewrite()) {
// use rule based mv rewrite strategy to do mv rewrite for single table and multi tables query
ruleRewriteIterative(tree, rootTaskContext, RuleSetType.ALL_MV_REWRITE);
} else if (isEnableSingleTableMVRewrite(rootTaskContext, sessionVariable, tree)) {
// now add single table materialized view rewrite rules in rule based rewrite phase to boost optimization
ruleRewriteIterative(tree, rootTaskContext, RuleSetType.SINGLE_TABLE_MV_REWRITE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public boolean matchWithoutChild(OptExpression expression) {
return true;
}

if (isPatternMultiJoin() && isMultiJoin(expression.getOp().getOpType())) {
return true;
}

return getOpType().equals(expression.getOp().getOpType());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class RuleSet {
private static final Map<RuleSetType, List<Rule>> REWRITE_RULES = Maps.newHashMap();
Expand Down Expand Up @@ -396,6 +398,11 @@ public class RuleSet {
OnlyJoinRule.getInstance()
));

REWRITE_RULES.put(RuleSetType.ALL_MV_REWRITE, Stream.concat(
REWRITE_RULES.get(RuleSetType.MULTI_TABLE_MV_REWRITE).stream(),
REWRITE_RULES.get(RuleSetType.SINGLE_TABLE_MV_REWRITE).stream())
.collect(Collectors.toList()));

REWRITE_RULES.put(RuleSetType.PRUNE_EMPTY_OPERATOR, ImmutableList.of(
PruneEmptyScanRule.OLAP_SCAN,
PruneEmptyScanRule.HIVE_SCAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum RuleSetType {
INTERSECT_REWRITE,
SINGLE_TABLE_MV_REWRITE,
MULTI_TABLE_MV_REWRITE,
ALL_MV_REWRITE,
PRUNE_EMPTY_OPERATOR,
SHORT_CIRCUIT_SET,
NUM_RULE_SET
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.sql.optimizer.rule.transformation.materialization;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvPlanContext;
import com.starrocks.sql.optimizer.CachingMvPlanContextBuilder;
import com.starrocks.sql.optimizer.ExpressionContext;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.operator.logical.LogicalAggregationOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator;
import com.starrocks.sql.optimizer.statistics.Statistics;
import com.starrocks.sql.optimizer.statistics.StatisticsCalculator;

import java.util.Comparator;
import java.util.List;

public class BestMvSelector {
private final List<OptExpression> expressions;
private final OptimizerContext context;
private final boolean isAggQuery;

public BestMvSelector(List<OptExpression> expressions, OptimizerContext context, boolean isAggQuery) {
this.expressions = expressions;
this.context = context;
this.isAggQuery = isAggQuery;
}

public OptExpression selectBest() {
if (expressions.size() == 1) {
return expressions.get(0);
}
// compute the statistics of OptExpression
for (OptExpression expression : expressions) {
calculateStatistics(expression, context);
}
List<CandidateContext> contexts = Lists.newArrayList();
for (int i = 0; i < expressions.size(); i++) {
CandidateContext mvContext =
getMVContext(expressions.get(i), isAggQuery, context);
Preconditions.checkState(mvContext != null);
mvContext.setIndex(i);
contexts.add(mvContext);
}
// sort expressions based on statistics output row count and compute size
contexts.sort(new CandidateContextComparator());
return expressions.get(contexts.get(0).getIndex());
}

@VisibleForTesting
public static class CandidateContext {
private Statistics mvStatistics;
private int schemaColumnNum;

// if mv is aggregated, set it to number of group by key,
// else set it to Integer.MAX_VALUE
private int groupbyColumnNum;
private int index;

public CandidateContext(Statistics mvStatistics, int schemaColumnNum) {
this(mvStatistics, schemaColumnNum, 0);
}

public CandidateContext(Statistics mvStatistics, int schemaColumnNum, int index) {
this.mvStatistics = mvStatistics;
this.schemaColumnNum = schemaColumnNum;
this.index = index;
this.groupbyColumnNum = Integer.MAX_VALUE;
}

public int getSchemaColumnNum() {
return schemaColumnNum;
}

public int getGroupbyColumnNum() {
return groupbyColumnNum;
}

public void setGroupbyColumnNum(int groupbyColumnNum) {
this.groupbyColumnNum = groupbyColumnNum;
}

public Statistics getMvStatistics() {
return mvStatistics;
}

public int getIndex() {
return index;
}

public void setIndex(int index) {
this.index = index;
}
}

@VisibleForTesting
public static class CandidateContextComparator implements Comparator<CandidateContext> {
@Override
public int compare(CandidateContext context1, CandidateContext context2) {
// compare group by key num
int ret = Integer.compare(context1.getGroupbyColumnNum(), context2.getGroupbyColumnNum());
if (ret != 0) {
return ret;
}
// compare by row number
ret = Double.compare(context1.getMvStatistics().getOutputRowCount(),
context2.getMvStatistics().getOutputRowCount());
if (ret != 0) {
return ret;
}
// compare by schema column num
ret = Integer.compare(context1.getSchemaColumnNum(), context2.getSchemaColumnNum());
if (ret != 0) {
return ret;
}

ret = Double.compare(context1.getMvStatistics().getComputeSize(), context2.getMvStatistics().getComputeSize());
return ret != 0 ? ret : Integer.compare(context1.getIndex(), context2.getIndex());
}
}

private void calculateStatistics(OptExpression expr, OptimizerContext context) {
// Avoid repeated calculate
if (expr.getStatistics() != null) {
return;
}

for (OptExpression child : expr.getInputs()) {
calculateStatistics(child, context);
}

ExpressionContext expressionContext = new ExpressionContext(expr);
StatisticsCalculator statisticsCalculator = new StatisticsCalculator(
expressionContext, context.getColumnRefFactory(), context);
statisticsCalculator.estimatorStats();
expr.setStatistics(expressionContext.getStatistics());
}

private CandidateContext getMVContext(
OptExpression expression, boolean isAggregate, OptimizerContext optimizerContext) {
if (expression.getOp() instanceof LogicalOlapScanOperator) {
LogicalOlapScanOperator scanOperator = expression.getOp().cast();
if (scanOperator.getTable().isMaterializedView()) {
CandidateContext candidateContext =
new CandidateContext(expression.getStatistics(), scanOperator.getTable().getBaseSchema().size());
if (isAggregate) {
MaterializedView mv = (MaterializedView) scanOperator.getTable();
List<MvPlanContext> planContexts = CachingMvPlanContextBuilder.getInstance().getPlanContext(
mv, optimizerContext.getSessionVariable().isEnableMaterializedViewPlanCache());
for (MvPlanContext planContext : planContexts) {
if (planContext.getLogicalPlan().getOp() instanceof LogicalAggregationOperator) {
LogicalAggregationOperator aggregationOperator = planContext.getLogicalPlan().getOp().cast();
candidateContext.setGroupbyColumnNum(aggregationOperator.getGroupingKeys().size());
}
}
}
return candidateContext;
}
}
for (OptExpression child : expression.getInputs()) {
CandidateContext context = getMVContext(child, isAggregate, optimizerContext);
if (context != null) {
return context;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* - Aggregate
* - Scan
*/
public class AggregateScanRule extends SingleTableRewriteBaseRule {
public class AggregateScanRule extends BaseMaterializedViewRewriteRule {
private static final AggregateScanRule INSTANCE = new AggregateScanRule();

public AggregateScanRule() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import com.starrocks.sql.optimizer.QueryMaterializationContext;
import com.starrocks.sql.optimizer.base.ColumnRefFactory;
import com.starrocks.sql.optimizer.operator.Operator;
import com.starrocks.sql.optimizer.operator.logical.LogicalAggregationOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator;
import com.starrocks.sql.optimizer.operator.pattern.Pattern;
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
import com.starrocks.sql.optimizer.rewrite.ReplaceColumnRefRewriter;
import com.starrocks.sql.optimizer.rule.RuleType;
import com.starrocks.sql.optimizer.rule.transformation.TransformationRule;
import com.starrocks.sql.optimizer.rule.transformation.materialization.BestMvSelector;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MVColumnPruner;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MVPartitionPruner;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MaterializedViewRewriter;
Expand Down Expand Up @@ -94,7 +96,18 @@ public boolean exhausted(OptimizerContext context) {
@Override
public List<OptExpression> transform(OptExpression queryExpression, OptimizerContext context) {
try {
return doTransform(queryExpression, context);
List<OptExpression> expressions = doTransform(queryExpression, context);
if (expressions == null || expressions.isEmpty()) {
return Lists.newArrayList();
}
if (context.isInMemoPhase()) {
return expressions;
} else {
// in rule phase, only return the best one result
BestMvSelector bestMvSelector = new BestMvSelector(
expressions, context, queryExpression.getOp() instanceof LogicalAggregationOperator);
return Lists.newArrayList(bestMvSelector.selectBest());
}
} catch (Exception e) {
String errMsg = ExceptionUtils.getStackTrace(e);
// for mv rewrite rules, do not disturb query when exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* Materialized View Rewrite Rule for pattern:
* - Scan
*/
public class OnlyScanRule extends SingleTableRewriteBaseRule {
public class OnlyScanRule extends BaseMaterializedViewRewriteRule {
private static final OnlyScanRule INSTANCE = new OnlyScanRule();

public OnlyScanRule() {
Expand Down
Loading

0 comments on commit 28b477f

Please sign in to comment.