Skip to content

Commit

Permalink
[Feature] support fe executes all constant query (StarRocks#44206)
Browse files Browse the repository at this point in the history
Signed-off-by: packy92 <[email protected]>
  • Loading branch information
packy92 authored Apr 22, 2024
1 parent 012acc9 commit 917c007
Show file tree
Hide file tree
Showing 12 changed files with 1,739 additions and 2 deletions.
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,8 @@ public static MaterializedViewRewriteMode parse(String str) {

public static final String ENABLE_CONNECTOR_SINK_WRITER_SCALING = "enable_connector_sink_writer_scaling";

public static final String ENABLE_CONSTANT_EXECUTE_IN_FE = "enable_constant_execute_in_fe";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
.add(MAX_EXECUTION_TIME)
Expand Down Expand Up @@ -2039,6 +2041,9 @@ public Optional<Boolean> isFollowerForwardToLeaderOpt() {
@VarAttr(name = ENABLE_PIPELINE_LEVEL_SHUFFLE, flag = VariableMgr.INVISIBLE)
private boolean enablePipelineLevelShuffle = true;

@VarAttr(name = ENABLE_CONSTANT_EXECUTE_IN_FE)
private boolean enableConstantExecuteInFE = true;

public int getExprChildrenLimit() {
return exprChildrenLimit;
}
Expand Down Expand Up @@ -3693,6 +3698,14 @@ public void setEnablePredicateMoveAround(boolean enablePredicateMoveAround) {
this.enablePredicateMoveAround = enablePredicateMoveAround;
}

public boolean isEnableConstantExecuteInFE() {
return enableConstantExecuteInFE;
}

public void setEnableConstantExecuteInFE(boolean enableConstantExecuteInFE) {
this.enableConstantExecuteInFE = enableConstantExecuteInFE;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
48 changes: 46 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import com.starrocks.proto.QueryStatisticsItemPB;
import com.starrocks.qe.QueryState.MysqlStateType;
import com.starrocks.qe.scheduler.Coordinator;
import com.starrocks.qe.scheduler.FeExecuteCoordinator;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ExplainAnalyzer;
import com.starrocks.sql.StatementPlanner;
Expand Down Expand Up @@ -165,7 +166,10 @@
import com.starrocks.sql.common.ErrorType;
import com.starrocks.sql.common.MetaUtils;
import com.starrocks.sql.common.StarRocksPlannerException;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
import com.starrocks.sql.optimizer.operator.physical.PhysicalValuesOperator;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.statistic.AnalyzeJob;
import com.starrocks.statistic.AnalyzeMgr;
Expand Down Expand Up @@ -197,6 +201,7 @@
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStatus;
import com.starrocks.transaction.VisibleStateWaiter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -991,6 +996,7 @@ private void handleQueryStmt(ExecPlan execPlan) throws Exception {
&& StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel());
boolean isSchedulerExplain = parsedStmt.isExplain()
&& StatementBase.ExplainLevel.SCHEDULER.equals(parsedStmt.getExplainLevel());
boolean executeInFe = !isExplainAnalyze & !isSchedulerExplain & canExecuteInFe(context, execPlan.getPhysicalPlan());

if (isExplainAnalyze) {
context.getSessionVariable().setEnableProfile(true);
Expand All @@ -999,7 +1005,11 @@ private void handleQueryStmt(ExecPlan execPlan) throws Exception {
} else if (isSchedulerExplain) {
// Do nothing.
} else if (parsedStmt.isExplain()) {
handleExplainStmt(buildExplainString(execPlan, ResourceGroupClassifier.QueryType.SELECT));
String explainString = buildExplainString(execPlan, ResourceGroupClassifier.QueryType.SELECT);
if (executeInFe) {
explainString = "EXECUTE IN FE\n" + explainString;
}
handleExplainStmt(explainString);
return;
}
if (context.getQueryDetail() != null) {
Expand All @@ -1013,7 +1023,11 @@ private void handleQueryStmt(ExecPlan execPlan) throws Exception {
List<String> colNames = execPlan.getColNames();
List<Expr> outputExprs = execPlan.getOutputExprs();

coord = getCoordinatorFactory().createQueryScheduler(context, fragments, scanNodes, descTable);
if (executeInFe) {
coord = new FeExecuteCoordinator(context, execPlan);
} else {
coord = getCoordinatorFactory().createQueryScheduler(context, fragments, scanNodes, descTable);
}

QeProcessorImpl.INSTANCE.registerQuery(context.getExecutionId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
Expand Down Expand Up @@ -2338,4 +2352,34 @@ public Pair<List<TResultBatch>, Status> executeStmtWithExecPlan(ConnectContext c
public List<ByteBuffer> getProxyResultBuffer() {
return proxyResultBuffer;
}


// scenes can execute in FE should meet all these requirements:
// 1. enable_constant_execute_in_fe = true
// 2. is mysql text protocol
// 3. all values are constantOperator
private boolean canExecuteInFe(ConnectContext context, OptExpression optExpression) {
if (!context.getSessionVariable().isEnableConstantExecuteInFE()) {
return false;
}

if (context instanceof HttpConnectContext || context.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
return false;
}

if (optExpression.getOp() instanceof PhysicalValuesOperator) {
PhysicalValuesOperator valuesOperator = (PhysicalValuesOperator) optExpression.getOp();
boolean isAllConstants = true;
if (valuesOperator.getProjection() != null) {
isAllConstants = valuesOperator.getProjection().getColumnRefMap().values().stream()
.allMatch(ScalarOperator::isConstantRef);
} else if (CollectionUtils.isNotEmpty(valuesOperator.getRows())) {
isAllConstants = valuesOperator.getRows().stream().allMatch(row ->
row.stream().allMatch(ScalarOperator::isConstantRef));
}

return isAllConstants;
}
return false;
}
}
Loading

0 comments on commit 917c007

Please sign in to comment.