Skip to content

Commit

Permalink
[FLINK-37076][table-planner] Make table arguments explicit in logical…
Browse files Browse the repository at this point in the history
… plans
  • Loading branch information
twalthr committed Jan 16, 2025
1 parent ed6de58 commit b9cc919
Show file tree
Hide file tree
Showing 23 changed files with 539 additions and 496 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@
*
* <ol>
* <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 661 ~ 678
* <li>Added in Flink-24024: Lines 1453 ~ 1463
* <li>Added in Flink-24024: Lines 1477 ~ 1520
* <li>Added in Flink-24024: Lines 1457 ~ 1461
* <li>Added in Flink-24024: Lines 1478 ~ 1513
* <li>Added in FLINK-28682: Lines 2346 ~ 2363
* <li>Added in FLINK-28682: Lines 2400 ~ 2428
* <li>Added in FLINK-32474: Lines 2480 ~ 2482
Expand Down Expand Up @@ -1454,11 +1454,7 @@ private void substituteSubQuery(Blackboard bb, SubQuery subQuery) {
return;
case SET_SEMANTICS_TABLE:
// ----- FLINK MODIFICATION BEGIN -----
// We always expand the SET SEMANTICS TABLE for two reasons:
// 1. Calcite has a bug when not expanding the SET SEMANTICS TABLE. For more
// information, see CALCITE-6204.
// 2. Currently, Flink’s built-in Session Window TVF is the only PTF with SET
// SEMANTICS. We will expand it by default, like other built-in window TVFs, to
// We always expand the SET_SEMANTICS_TABLE due to CALCITE-6204 and to
// reuse some subsequent processing and optimization logic.
// if (!config.isExpand()) {
// return;
Expand All @@ -1477,17 +1473,13 @@ private void substituteSubQueryOfSetSemanticsInputTable(Blackboard bb, SubQuery
call = (SqlBasicCall) subQuery.node;
query = call.operand(0);

// FLINK MODIFICATION BEGIN
// ----- FLINK MODIFICATION BEGIN -----

// We modified it for two reasons:
// 1. In Flink, Exchange nodes should not appear in the logical stage, which will bring
// uncertainty to the implementation of plan optimization in the current logical stage.
// Instead, Flink will add exchanges based on traits during the physical phase.
// 2. Currently, Flink’s built-in Session Window TVF is the only SET SEMANTICS
// TABLE. We will convert it into the same plan tree as other Window TVFs. The partition key
// and order key will be recorded using a custom RexCall when subsequently converting the
// SqlCall of SET SEMANTICS TABLE. See more at
// FlinkConvertletTable#convertSetSemanticsWindowTableFunction
// In Flink, exchange nodes do not appear in the logical phase. Instead,
// exchanges are added based on traits during the physical phase. The partition keys
// and order keys will be recorded using a custom RexCall when converting the
// SqlCall of SqlKind.SET_SEMANTICS_TABLE.
// See FlinkConvertletTable#convertTableArgs

final RelNode inputOfSetSemanticsTable =
convertQueryRecursive(query, false, null).project();
Expand Down Expand Up @@ -1520,7 +1512,7 @@ private void substituteSubQueryOfSetSemanticsInputTable(Blackboard bb, SubQuery
// relBuilder.sortExchange(distribution, orders);
// }

// FLINK MODIFICATION END
// ----- FLINK MODIFICATION END -----

RelNode tableRel = relBuilder.build();
subQuery.expr = bb.register(tableRel, JoinRelType.LEFT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
package org.apache.flink.table.planner.calcite;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.planner.functions.sql.SqlSessionTableFunction;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlBasicCall;
Expand All @@ -35,14 +32,16 @@
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlTableFunction;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindowTableFunction;
import org.apache.calcite.sql.validate.SqlValidatorImpl;
import org.apache.calcite.sql2rel.SqlRexContext;
import org.apache.calcite.sql2rel.SqlRexConvertlet;
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.StandardConvertletTable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

Expand All @@ -61,12 +60,11 @@ private FlinkConvertletTable() {}

@Override
public SqlRexConvertlet get(SqlCall call) {
if (call.getOperator().isName("TRY_CAST", false)) {
final SqlOperator operator = call.getOperator();
if (operator == FlinkSqlOperatorTable.TRY_CAST) {
return this::convertTryCast;
}

if (isSetSemanticsWindowTableFunction(call)) {
return this::convertSetSemanticsWindowTableFunction;
} else if (operator instanceof SqlTableFunction) {
return this::convertTableArgs;
}

return StandardConvertletTable.INSTANCE.get(call);
Expand Down Expand Up @@ -105,20 +103,15 @@ private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) {
type, FlinkSqlOperatorTable.TRY_CAST, Collections.singletonList(valueRex));
}

private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
if (!(call.getOperator() instanceof SqlWindowTableFunction)) {
return false;
}
List<SqlNode> operands = call.getOperandList();
return !operands.isEmpty() && operands.get(0).getKind() == SqlKind.SET_SEMANTICS_TABLE;
}

/**
* Due to CALCITE-6204, we need to manually extract partition keys and order keys and convert
* them to {@link RexSetSemanticsTableCall}.
* them to {@link RexTableArgCall}.
*
* <p>Take `SESSION(TABLE my_table PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '10'
* MINUTE)` as an example.
* <p>For example:
*
* <pre>
* SESSION(TABLE my_table PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '10' MINUTE)
* </pre>
*
* <p>The original SqlNode tree after syntax parse looks like
*
Expand All @@ -145,50 +138,62 @@ private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
* └─ RexLiteral: 300000:INTERVAL MINUTE
* </pre>
*
* <p>As a workaround, we flatten the inner sql call and convert it to a customized {@link
* RexSetSemanticsTableCall} to preserve partition keys and order keys
* <p>Instead, we introduce a customized {@link RexTableArgCall} to preserve properties of the
* table argument (i.e. partition keys and order keys).
*
* <pre>
* RexSetSemanticsTableCall: SESSION
* ├─ PartitionKeys: [1, 0]
* ├─ OrderKeys: []
* RexCall: SESSION
* ├─ RexTableArgCall: TABLE
* │ ├─ InputIndex: 0
* │ ├─ PartitionKeys: [1, 0]
* │ └─ OrderKeys: []
* ├─ RexCall: DESCRIPTOR(`rowtime`)
* │ └─ RexInputRef: `rowtime`
* └─ RexLiteral: 300000:INTERVAL MINUTE
* </pre>
*/
private RexNode convertSetSemanticsWindowTableFunction(SqlRexContext cx, final SqlCall call) {
private RexNode convertTableArgs(SqlRexContext cx, final SqlCall call) {
checkArgument(
call.getOperator() instanceof SqlSessionTableFunction,
"Currently, only the SESSION table function is supported in Set Semantics PTF.");
SqlSessionTableFunction fun = (SqlSessionTableFunction) call.getOperator();

List<SqlNode> operands = call.getOperandList();

SqlBasicCall setSemanticsPTFCall = (SqlBasicCall) operands.get(0);
SqlNodeList partitionKeys = setSemanticsPTFCall.operand(1);
SqlNodeList orderKeys = setSemanticsPTFCall.operand(2);
checkArgument(orderKeys.isEmpty(), "SESSION table function does not support order keys.");
RexCall resolvedCall =
(RexCall) StandardConvertletTable.INSTANCE.convertWindowFunction(cx, fun, call);
int[] partitionKeyRefs = getPartitionKeyIndices(cx, partitionKeys);

// attach the partition keys and order keys on the custom rex call
resolvedCall =
new RexSetSemanticsTableCall(
resolvedCall.getType(),
resolvedCall.getOperator(),
resolvedCall.getOperands(),
partitionKeyRefs,
new int[] {});
return resolvedCall;
call.getOperator() instanceof SqlTableFunction,
"Only table functions can have set semantics arguments.");
final SqlOperator operator = call.getOperator();
final RelDataType returnType = cx.getValidator().getValidatedNodeType(call);

final List<RexNode> rewrittenOperands = new ArrayList<>();
int tableInputCount = 0;
for (int pos = 0; pos < call.getOperandList().size(); pos++) {
final SqlNode operand = call.operand(pos);
if (operand.getKind() == SqlKind.SET_SEMANTICS_TABLE) {
final SqlBasicCall setSemanticsCall = (SqlBasicCall) operand;
final SqlNodeList partitionKeys = setSemanticsCall.operand(1);
final SqlNodeList orderKeys = setSemanticsCall.operand(2);
checkArgument(
orderKeys.isEmpty(), "Table functions do not support order keys yet.");
rewrittenOperands.add(
new RexTableArgCall(
cx.getValidator().getValidatedNodeType(operand),
tableInputCount++,
getPartitionKeyIndices(cx, partitionKeys),
new int[0]));
} else if (operand.isA(SqlKind.QUERY)) {
rewrittenOperands.add(
new RexTableArgCall(
cx.getValidator().getValidatedNodeType(operand),
tableInputCount++,
new int[0],
new int[0]));
} else {
rewrittenOperands.add(cx.convertExpression(operand));
}
}

return cx.getRexBuilder().makeCall(returnType, operator, rewrittenOperands);
}

private int[] getPartitionKeyIndices(SqlRexContext cx, SqlNodeList partitions) {
private static int[] getPartitionKeyIndices(SqlRexContext cx, SqlNodeList partitions) {
final int[] result = new int[partitions.size()];

for (int i = 0; i < partitions.getList().size(); i++) {
RexNode expr = cx.convertExpression(partitions.get(i));
final RexNode expr = cx.convertExpression(partitions.get(i));
result[i] = parseFieldIdx(expr);
}
return result;
Expand All @@ -200,6 +205,6 @@ private static int parseFieldIdx(RexNode e) {
return ref.getIndex();
}
// should not happen
throw new TableException("Unsupported partition key with type: " + e.getKind());
throw new IllegalStateException("Unsupported partition key with type: " + e.getKind());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.table.planner.calcite;

import org.apache.flink.table.planner.functions.sql.SqlDefaultOperator;
import org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
Expand Down Expand Up @@ -107,7 +107,8 @@ private List<SqlNode> getRewrittenOperands() {
&& ((SqlCall) operand).getOperator() == SqlStdOperatorTable.DEFAULT) {
final RelDataType argumentType = fixedArgumentTypes.get(rewrittenOperands.size());
final SqlCall defaultArg =
new SqlDefaultOperator(argumentType).createCall(SqlParserPos.ZERO);
new SqlDefaultArgOperator(argumentType).createCall(SqlParserPos.ZERO);
getValidator().setValidatedNodeType(defaultArg, argumentType);
rewrittenOperands.add(defaultArg);
} else {
rewrittenOperands.add(operand);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,42 @@

package org.apache.flink.table.planner.calcite;

import org.apache.flink.table.planner.functions.sql.SqlTableArgOperator;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.StaticArgumentTrait;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSyntax;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* A special {@link RexCall} that is used to represent a table function with set semantics. See more
* details in {@link FlinkConvertletTable#convertSetSemanticsWindowTableFunction}.
* A special {@link RexCall} that represents a table argument in a signature of {@link
* StaticArgument}s. The table arguments describe a {@link StaticArgumentTrait#TABLE_AS_SET} or
* {@link StaticArgumentTrait#TABLE_AS_ROW}.
*
* @see FlinkConvertletTable
*/
public class RexSetSemanticsTableCall extends RexCall {
public class RexTableArgCall extends RexCall {

private final int inputIndex;
private final int[] partitionKeys;

private final int[] orderKeys;

public RexSetSemanticsTableCall(
RelDataType type,
SqlOperator operator,
List<? extends RexNode> operands,
int[] partitionKeys,
int[] orderKeys) {
super(type, operator, operands);
public RexTableArgCall(RelDataType type, int inputIndex, int[] partitionKeys, int[] orderKeys) {
super(type, SqlTableArgOperator.INSTANCE, List.of());
this.inputIndex = inputIndex;
this.partitionKeys = partitionKeys;
this.orderKeys = orderKeys;
}

public int getInputIndex() {
return inputIndex;
}

public int[] getPartitionKeys() {
return partitionKeys;
}
Expand All @@ -59,43 +64,36 @@ public int[] getOrderKeys() {

@Override
protected String computeDigest(boolean withType) {
if ((operands.isEmpty()) && (op.getSyntax() == SqlSyntax.FUNCTION_ID)) {
return super.computeDigest(withType);
}
final StringBuilder sb = new StringBuilder(op.getName());
sb.append("(");
appendKeys(partitionKeys, "PARTITION BY", sb);
appendKeys(orderKeys, "ORDER BY", sb);
appendOperands(sb);
sb.append("#");
sb.append(inputIndex);
sb.append(")");
if (withType) {
sb.append(":");

// NOTE jvs 16-Jan-2005: for digests, it is very important
// to use the full type string.
sb.append(type.getFullTypeString());
}
formatKeys(sb, partitionKeys, " PARTITION BY");
formatKeys(sb, orderKeys, " ORDER BY");
return sb.toString();
}

private void appendKeys(int[] keys, String prefix, StringBuilder sb) {
private void formatKeys(StringBuilder sb, int[] keys, String prefix) {
if (keys.length == 0) {
return;
}
sb.append(
Arrays.stream(keys)
.mapToObj(key -> "$" + key)
.collect(Collectors.joining(", ", prefix + "(", "), ")));
.collect(Collectors.joining(", ", prefix + "(", ")")));
}

public RexSetSemanticsTableCall copy(
List<? extends RexNode> newOperands, int[] newPartitionKeys, int[] newOrderKeys) {
return new RexSetSemanticsTableCall(type, op, newOperands, newPartitionKeys, newOrderKeys);
@Override
public RexCall clone(RelDataType type, List<RexNode> operands) {
return new RexTableArgCall(type, inputIndex, partitionKeys, orderKeys);
}

@Override
public RexSetSemanticsTableCall clone(RelDataType type, List<RexNode> operands) {
return new RexSetSemanticsTableCall(
type, getOperator(), operands, partitionKeys, orderKeys);
public RexTableArgCall copy(RelDataType type, int[] partitionKeys, int[] orderKeys) {
return new RexTableArgCall(type, inputIndex, partitionKeys, orderKeys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.functions.sql;

import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;
import org.apache.flink.table.types.inference.StaticArgument;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
Expand All @@ -32,14 +33,14 @@
import org.apache.calcite.sql.validate.SqlValidatorScope;

/**
* Marker for optional arguments inserted by {@link FlinkSqlCallBinding}. Compared to Calcite, this
* operator stores its type.
* Marker for optional arguments in a signature of {@link StaticArgument}s inserted by {@link
* FlinkSqlCallBinding}. Compared to Calcite, this operator stores its type.
*/
public class SqlDefaultOperator extends SqlSpecialOperator {
public class SqlDefaultArgOperator extends SqlSpecialOperator {

private final RelDataType returnType;

public SqlDefaultOperator(RelDataType returnType) {
public SqlDefaultArgOperator(RelDataType returnType) {
super(
"DEFAULT",
SqlKind.DEFAULT,
Expand Down
Loading

0 comments on commit b9cc919

Please sign in to comment.