diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
index 8704a3c440baad..4e63b261e7d4a2 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java
@@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.types.RowKind;
/**
* Declares traits for {@link ArgumentHint}. They enable basic validation by the framework.
@@ -78,6 +79,8 @@ public enum ArgumentTrait {
/**
* Defines that a PARTITION BY clause is optional for {@link #TABLE_AS_SET}. By default, it is
* mandatory for improving the parallel execution by distributing the table by key.
+ *
+ *
Note: This trait is only valid for {@link #TABLE_AS_SET} arguments.
*/
OPTIONAL_PARTITION_BY(false, StaticArgumentTrait.OPTIONAL_PARTITION_BY),
@@ -97,8 +100,34 @@ public enum ArgumentTrait {
*
*
In case of multiple table arguments, pass-through columns are added according to the
* declaration order in the PTF signature.
+ *
+ *
Note: This trait is valid for {@link #TABLE_AS_ROW} and {@link #TABLE_AS_SET} arguments.
+ */
+ PASS_COLUMNS_THROUGH(false, StaticArgumentTrait.PASS_COLUMNS_THROUGH),
+
+ /**
+ * Defines that updates are allowed as input to the given table argument. By default, a table
+ * argument is insert-only and updates will be rejected.
+ *
+ *
Input tables become updating when sub queries such as aggregations or outer joins force an
+ * incremental computation. For example, the following query only works if the function is able
+ * to digest retraction messages:
+ *
+ *
+ * // Changes +[1] followed by -U[1], +U[2], -U[2], +U[3] will enter the function
+ * WITH UpdatingTable AS (
+ * SELECT COUNT(*) FROM (VALUES 1, 2, 3)
+ * )
+ * SELECT * FROM f(tableArg => TABLE UpdatingTable)
+ *
+ *
+ * If updates should be supported, ensure that the data type of the table argument is chosen
+ * in a way that it can encode changes. In other words: choose a row type that exposes the
+ * {@link RowKind} change flag.
+ *
+ *
Note: This trait is valid for {@link #TABLE_AS_ROW} and {@link #TABLE_AS_SET} arguments.
*/
- PASS_COLUMNS_THROUGH(false, StaticArgumentTrait.PASS_COLUMNS_THROUGH);
+ SUPPORT_UPDATES(false, StaticArgumentTrait.SUPPORT_UPDATES);
private final boolean isRoot;
private final StaticArgumentTrait staticTrait;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
index fc7fd86abb2ee6..73f8afbf60fd18 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java
@@ -22,6 +22,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.NullType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.StructuredType;
@@ -273,29 +274,46 @@ private void checkOptionalType() {
}
}
- void checkTableType() {
+ private void checkTableType() {
if (!traits.contains(StaticArgumentTrait.TABLE)) {
return;
}
- if (dataType == null
- && conversionClass != null
- && !DUMMY_ROW_TYPE.supportsInputConversion(conversionClass)) {
+ checkPolymorphicTableType();
+ checkTypedTableType();
+ }
+
+ private void checkPolymorphicTableType() {
+ if (dataType != null || conversionClass == null) {
+ return;
+ }
+ if (!DUMMY_ROW_TYPE.supportsInputConversion(conversionClass)) {
throw new ValidationException(
String.format(
"Invalid conversion class '%s' for argument '%s'. "
+ "Polymorphic, untyped table arguments must use a row class.",
conversionClass.getName(), name));
}
- if (dataType != null) {
- final LogicalType type = dataType.getLogicalType();
- if (traits.contains(StaticArgumentTrait.TABLE)
- && !LogicalTypeChecks.isCompositeType(type)) {
- throw new ValidationException(
- String.format(
- "Invalid data type '%s' for table argument '%s'. "
- + "Typed table arguments must use a composite type (i.e. row or structured type).",
- type, name));
- }
+ }
+
+ private void checkTypedTableType() {
+ if (dataType == null) {
+ return;
+ }
+ final LogicalType type = dataType.getLogicalType();
+ if (traits.contains(StaticArgumentTrait.TABLE)
+ && !LogicalTypeChecks.isCompositeType(type)) {
+ throw new ValidationException(
+ String.format(
+ "Invalid data type '%s' for table argument '%s'. "
+ + "Typed table arguments must use a composite type (i.e. row or structured type).",
+ type, name));
+ }
+ if (is(StaticArgumentTrait.SUPPORT_UPDATES) && !type.is(LogicalTypeRoot.ROW)) {
+ throw new ValidationException(
+ String.format(
+ "Invalid data type '%s' for table argument '%s'. "
+ + "Table arguments that support updates must use a row type.",
+ type, name));
}
}
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
index b9f4d4c71fbb61..fbcc7be78791f9 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java
@@ -37,8 +37,9 @@ public enum StaticArgumentTrait {
MODEL(),
TABLE_AS_ROW(TABLE),
TABLE_AS_SET(TABLE),
- OPTIONAL_PARTITION_BY(TABLE_AS_SET),
- PASS_COLUMNS_THROUGH(TABLE);
+ PASS_COLUMNS_THROUGH(TABLE),
+ SUPPORT_UPDATES(TABLE),
+ OPTIONAL_PARTITION_BY(TABLE_AS_SET);
private final Set requirements;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
index 6c64b32ec25f61..d3b18b3ab12488 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java
@@ -81,6 +81,10 @@ public static TypeInference of(FunctionKind functionKind, TypeInference origin)
return builder.build();
}
+ public static boolean isValidUidForProcessTableFunction(String uid) {
+ return UID_FORMAT.test(uid);
+ }
+
// --------------------------------------------------------------------------------------------
private static void checkScalarArgsOnly(List defaultArgs) {
@@ -283,7 +287,7 @@ public Optional> inferInputTypes(
+ "that is not overloaded and doesn't contain varargs.");
}
- checkUidColumn(callContext);
+ checkUidArg(callContext);
checkMultipleTableArgs(callContext);
checkTableArgTraits(staticArgs, callContext);
@@ -297,16 +301,16 @@ public List getExpectedSignatures(FunctionDefinition definition) {
return origin.getExpectedSignatures(definition);
}
- private static void checkUidColumn(CallContext callContext) {
+ private static void checkUidArg(CallContext callContext) {
final List args = callContext.getArgumentDataTypes();
// Verify the uid format if provided
int uidPos = args.size() - 1;
if (!callContext.isArgumentNull(uidPos)) {
final String uid = callContext.getArgumentValue(uidPos, String.class).orElse("");
- if (!UID_FORMAT.test(uid)) {
+ if (!isValidUidForProcessTableFunction(uid)) {
throw new ValidationException(
- "Invalid unique identifier for process table function. The 'uid' argument "
+ "Invalid unique identifier for process table function. The `uid` argument "
+ "must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. "
+ "But found: "
+ uid);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java
index 369f392042b368..adbcd0633a32a8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java
@@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -96,4 +97,29 @@ public RexCall clone(RelDataType type, List operands) {
public RexTableArgCall copy(RelDataType type, int[] partitionKeys, int[] orderKeys) {
return new RexTableArgCall(type, inputIndex, partitionKeys, orderKeys);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ final RexTableArgCall that = (RexTableArgCall) o;
+ return inputIndex == that.inputIndex
+ && Arrays.equals(partitionKeys, that.partitionKeys)
+ && Arrays.equals(orderKeys, that.orderKeys);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(super.hashCode(), inputIndex);
+ result = 31 * result + Arrays.hashCode(partitionKeys);
+ result = 31 * result + Arrays.hashCode(orderKeys);
+ return result;
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
index beadfc4f12547e..3d9e9dbc26cf4c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
@@ -19,15 +19,19 @@
package org.apache.flink.table.planner.plan.nodes.exec;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.plan.nodes.common.CommonIntermediateTableScan;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
import org.apache.calcite.rel.RelNode;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* A generator that generates a {@link ExecNode} graph from a graph of {@link FlinkPhysicalRel}s.
@@ -43,9 +47,11 @@
public class ExecNodeGraphGenerator {
private final Map> visitedRels;
+ private final Set visitedProcessTableFunctionUids;
public ExecNodeGraphGenerator() {
this.visitedRels = new IdentityHashMap<>();
+ this.visitedProcessTableFunctionUids = new HashSet<>();
}
public ExecNodeGraph generate(List relNodes, boolean isCompiled) {
@@ -78,8 +84,25 @@ private ExecNode> generate(FlinkPhysicalRel rel, boolean isCompiled) {
inputEdges.add(ExecEdge.builder().source(inputNode).target(execNode).build());
}
execNode.setInputEdges(inputEdges);
-
+ checkUidForProcessTableFunction(execNode);
visitedRels.put(rel, execNode);
return execNode;
}
+
+ private void checkUidForProcessTableFunction(ExecNode> execNode) {
+ if (!(execNode instanceof StreamExecProcessTableFunction)) {
+ return;
+ }
+ final String uid = ((StreamExecProcessTableFunction) execNode).getUid();
+ if (visitedProcessTableFunctionUids.contains(uid)) {
+ throw new ValidationException(
+ String.format(
+ "Duplicate unique identifier '%s' detected among process table functions. "
+ + "Make sure that all PTF calls have an identifier defined that is globally unique. "
+ + "Please provide a custom identifier using the implicit `uid` argument. "
+ + "For example: myFunction(..., uid => 'my-id')",
+ uid));
+ }
+ visitedProcessTableFunctionUids.add(uid);
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index c5111219283edc..89a30ff9a40420 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -27,6 +27,7 @@
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator;
@@ -93,6 +94,8 @@
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NAME;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NULL_AS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_OPERANDS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_ORDER_KEYS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_PARTITION_KEYS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_RANGES;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SARG;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SQL_KIND;
@@ -107,6 +110,7 @@
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_INPUT_REF;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_LITERAL;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_PATTERN_INPUT_REF;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_TABLE_ARG_CALL;
import static org.apache.flink.table.planner.typeutils.SymbolUtil.serializableToCalcite;
/**
@@ -144,6 +148,8 @@ private static RexNode deserialize(JsonNode jsonNode, SerdeContext serdeContext)
return deserializeCorrelVariable(jsonNode, serdeContext);
case KIND_PATTERN_INPUT_REF:
return deserializePatternFieldRef(jsonNode, serdeContext);
+ case KIND_TABLE_ARG_CALL:
+ return deserializeTableArgCall(jsonNode, serdeContext);
case KIND_CALL:
return deserializeCall(jsonNode, serdeContext);
default:
@@ -313,6 +319,28 @@ private static RexNode deserializePatternFieldRef(
return serdeContext.getRexBuilder().makePatternFieldRef(alpha, fieldType, inputIndex);
}
+ private static RexNode deserializeTableArgCall(JsonNode jsonNode, SerdeContext serdeContext) {
+ final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE);
+ final RelDataType callType =
+ RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext);
+
+ final int inputIndex = jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue();
+
+ final JsonNode partitionKeysNode = jsonNode.required(FIELD_NAME_PARTITION_KEYS);
+ final int[] partitionKeys = new int[partitionKeysNode.size()];
+ for (int i = 0; i < partitionKeysNode.size(); ++i) {
+ partitionKeys[i] = partitionKeysNode.get(i).asInt();
+ }
+
+ final JsonNode orderKeysNode = jsonNode.required(FIELD_NAME_ORDER_KEYS);
+ final int[] orderKeys = new int[orderKeysNode.size()];
+ for (int i = 0; i < orderKeysNode.size(); ++i) {
+ orderKeys[i] = orderKeysNode.get(i).asInt();
+ }
+
+ return new RexTableArgCall(callType, inputIndex, partitionKeys, orderKeys);
+ }
+
private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext serdeContext)
throws IOException {
final SqlOperator operator = deserializeSqlOperator(jsonNode, serdeContext);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index f559f6445e5ed2..facb4a11158a9c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -33,6 +33,7 @@
import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator;
@@ -81,10 +82,10 @@ final class RexNodeJsonSerializer extends StdSerializer {
static final String FIELD_NAME_VALUE = "value";
static final String FIELD_NAME_TYPE = "type";
static final String FIELD_NAME_NAME = "name";
+ static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
// INPUT_REF
static final String KIND_INPUT_REF = "INPUT_REF";
- static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
// LITERAL
static final String KIND_LITERAL = "LITERAL";
@@ -122,6 +123,11 @@ final class RexNodeJsonSerializer extends StdSerializer {
static final String FIELD_NAME_SQL_KIND = "sqlKind";
static final String FIELD_NAME_CLASS = "class";
+ // TABLE_ARG_CALL
+ static final String KIND_TABLE_ARG_CALL = "TABLE_ARG_CALL";
+ static final String FIELD_NAME_PARTITION_KEYS = "partitionKeys";
+ static final String FIELD_NAME_ORDER_KEYS = "orderKeys";
+
RexNodeJsonSerializer() {
super(RexNode.class);
}
@@ -154,7 +160,10 @@ public void serialize(
(RexPatternFieldRef) rexNode, jsonGenerator, serializerProvider);
break;
default:
- if (rexNode instanceof RexCall) {
+ if (rexNode instanceof RexTableArgCall) {
+ serializeTableArgCall(
+ (RexTableArgCall) rexNode, jsonGenerator, serializerProvider);
+ } else if (rexNode instanceof RexCall) {
serializeCall(
(RexCall) rexNode,
jsonGenerator,
@@ -323,6 +332,20 @@ private static void serializeCorrelVariable(
gen.writeEndObject();
}
+ private static void serializeTableArgCall(
+ RexTableArgCall tableArgCall, JsonGenerator gen, SerializerProvider serializerProvider)
+ throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField(FIELD_NAME_KIND, KIND_TABLE_ARG_CALL);
+ gen.writeNumberField(FIELD_NAME_INPUT_INDEX, tableArgCall.getInputIndex());
+ gen.writeFieldName(FIELD_NAME_PARTITION_KEYS);
+ gen.writeArray(tableArgCall.getPartitionKeys(), 0, tableArgCall.getPartitionKeys().length);
+ gen.writeFieldName(FIELD_NAME_ORDER_KEYS);
+ gen.writeArray(tableArgCall.getOrderKeys(), 0, tableArgCall.getOrderKeys().length);
+ serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, tableArgCall.getType(), gen);
+ gen.writeEndObject();
+ }
+
private static void serializeCall(
RexCall call,
JsonGenerator gen,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
new file mode 100644
index 00000000000000..50a8a0f610864f
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+/**
+ * {@link StreamExecNode} for {@link ProcessTableFunction}.
+ *
+ * A process table function (PTF) maps zero, one, or multiple tables to zero, one, or multiple
+ * rows. PTFs enable implementing user-defined operators that can be as feature-rich as built-in
+ * operations. PTFs have access to Flink's managed state, event-time and timer services, underlying
+ * table changelogs, and can take multiple ordered and/or partitioned tables to produce a new table.
+ */
+@ExecNodeMetadata(
+ name = "stream-exec-process-table-function",
+ version = 1,
+ producedTransformations = StreamExecProcessTableFunction.PROCESS_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v2_0,
+ minStateVersion = FlinkVersion.v2_0)
+public class StreamExecProcessTableFunction extends ExecNodeBase
+ implements StreamExecNode, SingleTransformationTranslator {
+
+ public static final String PROCESS_TRANSFORMATION = "process";
+
+ public static final String FIELD_NAME_UID = "uid";
+ public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
+ public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = "inputChangelogModes";
+
+ @JsonProperty(FIELD_NAME_UID)
+ private final String uid;
+
+ @JsonProperty(FIELD_NAME_FUNCTION_CALL)
+ private final RexCall invocation;
+
+ @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES)
+ private final List inputChangelogModes;
+
+ public StreamExecProcessTableFunction(
+ ReadableConfig tableConfig,
+ List inputProperties,
+ RowType outputType,
+ String description,
+ String uid,
+ RexCall invocation,
+ List inputChangelogModes) {
+ this(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(StreamExecProcessTableFunction.class),
+ ExecNodeContext.newPersistedConfig(
+ StreamExecProcessTableFunction.class, tableConfig),
+ inputProperties,
+ outputType,
+ description,
+ uid,
+ invocation,
+ inputChangelogModes);
+ }
+
+ @JsonCreator
+ public StreamExecProcessTableFunction(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description,
+ @JsonProperty(FIELD_NAME_UID) String uid,
+ @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
+ @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES)
+ List inputChangelogModes) {
+ super(id, context, persistedConfig, inputProperties, outputType, description);
+ this.uid = uid;
+ this.invocation = (RexCall) invocation;
+ this.inputChangelogModes = inputChangelogModes;
+ }
+
+ public String getUid() {
+ return uid;
+ }
+
+ @Override
+ protected Transformation translateToPlanInternal(
+ PlannerBase planner, ExecNodeConfig config) {
+ throw new TableException("Process table function is not fully supported yet.");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java
index d6113743f3e441..c88d132b4c3cba 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java
@@ -18,16 +18,21 @@
package org.apache.flink.table.planner.plan.rules.physical.batch;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCorrelate;
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalValues;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexUtil;
import org.immutables.value.Value;
@@ -35,20 +40,21 @@
import scala.Option;
/**
- * Converts {@link FlinkLogicalTableFunctionScan} with constant RexCall to
+ * Converts {@link FlinkLogicalTableFunctionScan} with constant parameters. Add the rule to support
+ * selecting from a UDF directly, e.g. {@code SELECT * FROM func() as T(c)}.
+ *
+ * For {@link FunctionKind#TABLE}:
*
*
- * {@link BatchPhysicalCorrelate}
- * / \
- * empty {@link BatchPhysicalValuesRule}} {@link FlinkLogicalTableFunctionScan}.
+ * empty {@link BatchPhysicalValues} -> {@link BatchPhysicalCorrelate}
*
*
- * Add the rule to support select from a UDF directly, such as the following SQL: {@code SELECT *
- * FROM LATERAL TABLE(func()) as T(c)}
+ *
{@link BatchPhysicalCorrelateRule} powers queries such as {@code SELECT * FROM T, LATERAL
+ * TABLE(func()) as T(c)} or {@code SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)}.
+ *
+ *
For {@link FunctionKind#PROCESS_TABLE}:
*
- *
Note: {@link BatchPhysicalCorrelateRule} is responsible for converting a reasonable physical
- * plan for the normal correlate query, such as the following SQL: example1: {@code SELECT * FROM T,
- * LATERAL TABLE(func()) as T(c) example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)}
+ *
{@link FunctionKind#PROCESS_TABLE} is currently unsupported.
*/
@Value.Enclosing
public class BatchPhysicalConstantTableFunctionScanRule
@@ -65,18 +71,17 @@ protected BatchPhysicalConstantTableFunctionScanRule(
}
public boolean matches(RelOptRuleCall call) {
- FlinkLogicalTableFunctionScan scan = call.rel(0);
- return RexUtil.isConstant(scan.getCall()) && scan.getInputs().isEmpty();
+ final FlinkLogicalTableFunctionScan scan = call.rel(0);
+ return !RexUtil.containsInputRef(scan.getCall()) && scan.getInputs().isEmpty();
}
public void onMatch(RelOptRuleCall call) {
- FlinkLogicalTableFunctionScan scan = call.rel(0);
-
- // create correlate left
- RelOptCluster cluster = scan.getCluster();
- RelTraitSet traitSet =
+ final FlinkLogicalTableFunctionScan scan = call.rel(0);
+ final RelOptCluster cluster = scan.getCluster();
+ final RelTraitSet traitSet =
call.getPlanner().emptyTraitSet().replace(FlinkConventions.BATCH_PHYSICAL());
- BatchPhysicalValues values =
+
+ final BatchPhysicalValues values =
new BatchPhysicalValues(
cluster,
traitSet,
@@ -84,35 +89,37 @@ public void onMatch(RelOptRuleCall call) {
cluster.getTypeFactory()
.createStructType(ImmutableList.of(), ImmutableList.of()));
- BatchPhysicalCorrelate correlate =
- new BatchPhysicalCorrelate(
- cluster,
- traitSet,
- values,
- scan,
- Option.empty(),
- scan.getRowType(),
- JoinRelType.INNER);
- call.transformTo(correlate);
+ final FunctionDefinition function = ShortcutUtils.unwrapFunctionDefinition(scan.getCall());
+ assert function != null;
+ final RelNode replacement;
+ if (function.getKind() == FunctionKind.TABLE) {
+ replacement =
+ new BatchPhysicalCorrelate(
+ cluster,
+ traitSet,
+ values,
+ scan,
+ Option.empty(),
+ scan.getRowType(),
+ JoinRelType.INNER);
+ } else {
+ throw new TableException("Unsupported function for scan:" + function.getKind());
+ }
+
+ call.transformTo(replacement);
}
/** Configuration for {@link BatchPhysicalConstantTableFunctionScanRule}. */
- @Value.Immutable(singleton = false)
+ @Value.Immutable
public interface BatchPhysicalConstantTableFunctionScanRuleConfig extends RelRule.Config {
- BatchPhysicalConstantTableFunctionScanRule.BatchPhysicalConstantTableFunctionScanRuleConfig
- DEFAULT =
- ImmutableBatchPhysicalConstantTableFunctionScanRule
- .BatchPhysicalConstantTableFunctionScanRuleConfig.builder()
- .build()
- .withOperandSupplier(
- b0 ->
- b0.operand(FlinkLogicalTableFunctionScan.class)
- .anyInputs())
- .withDescription("BatchPhysicalConstantTableFunctionScanRule")
- .as(
- BatchPhysicalConstantTableFunctionScanRule
- .BatchPhysicalConstantTableFunctionScanRuleConfig
- .class);
+ BatchPhysicalConstantTableFunctionScanRuleConfig DEFAULT =
+ ImmutableBatchPhysicalConstantTableFunctionScanRule
+ .BatchPhysicalConstantTableFunctionScanRuleConfig.builder()
+ .build()
+ .withOperandSupplier(
+ b0 -> b0.operand(FlinkLogicalTableFunctionScan.class).anyInputs())
+ .withDescription("BatchPhysicalConstantTableFunctionScanRule")
+ .as(BatchPhysicalConstantTableFunctionScanRuleConfig.class);
@Override
default BatchPhysicalConstantTableFunctionScanRule toRule() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java
index 385b34e045757c..df177f7bdbce84 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java
@@ -18,16 +18,22 @@
package org.apache.flink.table.planner.plan.rules.physical.stream;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelate;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunction;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalValues;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexUtil;
import org.immutables.value.Value;
@@ -35,20 +41,26 @@
import scala.Option;
/**
- * Converts {@link FlinkLogicalTableFunctionScan} with constant RexCall. To
+ * Converts {@link FlinkLogicalTableFunctionScan} with constant parameters. Add the rule to support
+ * selecting from a UDF directly, e.g. {@code SELECT * FROM func() as T(c)}.
+ *
+ *
For {@link org.apache.flink.table.functions.FunctionKind#TABLE}:
*
*
- * {@link StreamPhysicalCorrelate}
- * / \
- * empty {@link StreamPhysicalValues} {@link FlinkLogicalTableFunctionScan}
+ * empty {@link StreamPhysicalValues} -> {@link StreamPhysicalCorrelate}
*
*
- * Add the rule to support select from a UDF directly, such as the following SQL: {@code SELECT *
- * FROM LATERAL TABLE(func()) as T(c)}
+ *
{@link StreamPhysicalCorrelateRule} powers queries such as {@code SELECT * FROM T, LATERAL
+ * TABLE(func()) as T(c)} or {@code SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)}.
+ *
+ *
For {@link org.apache.flink.table.functions.FunctionKind#PROCESS_TABLE}:
+ *
+ *
+ * empty {@link StreamPhysicalValues} -> {@link StreamPhysicalProcessTableFunction}
+ *
*
- * Note: @{link StreamPhysicalCorrelateRule} is responsible for converting a reasonable physical
- * plan for the normal correlate query, such as the following SQL: example1: {@code SELECT * FROM T,
- * LATERAL TABLE(func()) as T(c) example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)}
+ *
{@link StreamPhysicalProcessTableFunction} powers queries such as {@code SELECT * FROM func(t
+ * => TABLE T)} or {@code SELECT * FROM func(t => TABLE T PARTITION BY k)}.
*/
@Value.Enclosing
public class StreamPhysicalConstantTableFunctionScanRule
@@ -67,18 +79,17 @@ protected StreamPhysicalConstantTableFunctionScanRule(
}
public boolean matches(RelOptRuleCall call) {
- FlinkLogicalTableFunctionScan scan = call.rel(0);
+ final FlinkLogicalTableFunctionScan scan = call.rel(0);
return !RexUtil.containsInputRef(scan.getCall()) && scan.getInputs().isEmpty();
}
public void onMatch(RelOptRuleCall call) {
- FlinkLogicalTableFunctionScan scan = call.rel(0);
-
- // create correlate left
- RelOptCluster cluster = scan.getCluster();
- RelTraitSet traitSet =
+ final FlinkLogicalTableFunctionScan scan = call.rel(0);
+ final RelOptCluster cluster = scan.getCluster();
+ final RelTraitSet traitSet =
call.getPlanner().emptyTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
- StreamPhysicalValues values =
+
+ final StreamPhysicalValues values =
new StreamPhysicalValues(
cluster,
traitSet,
@@ -86,20 +97,32 @@ public void onMatch(RelOptRuleCall call) {
cluster.getTypeFactory()
.createStructType(ImmutableList.of(), ImmutableList.of()));
- StreamPhysicalCorrelate correlate =
- new StreamPhysicalCorrelate(
- cluster,
- traitSet,
- values,
- scan,
- Option.empty(),
- scan.getRowType(),
- JoinRelType.INNER);
- call.transformTo(correlate);
+ final FunctionDefinition function = ShortcutUtils.unwrapFunctionDefinition(scan.getCall());
+ assert function != null;
+ final RelNode replacement;
+ if (function.getKind() == FunctionKind.TABLE) {
+ replacement =
+ new StreamPhysicalCorrelate(
+ cluster,
+ traitSet,
+ values,
+ scan,
+ Option.empty(),
+ scan.getRowType(),
+ JoinRelType.INNER);
+ } else if (function.getKind() == FunctionKind.PROCESS_TABLE) {
+ replacement =
+ new StreamPhysicalProcessTableFunction(
+ cluster, traitSet, values, scan, scan.getRowType());
+ } else {
+ throw new TableException("Unsupported function for scan:" + function.getKind());
+ }
+
+ call.transformTo(replacement);
}
/** Configuration for {@link StreamPhysicalConstantTableFunctionScanRule}. */
- @Value.Immutable(singleton = false)
+ @Value.Immutable
public interface StreamPhysicalConstantTableFunctionScanRuleConfig extends RelRule.Config {
StreamPhysicalConstantTableFunctionScanRule
.StreamPhysicalConstantTableFunctionScanRuleConfig
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index a19fc0dbaf96f5..484d0883be1b60 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -76,6 +76,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultipleInput;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
@@ -124,7 +125,7 @@ private ExecNodeMetadataUtil() {
}
private static final Set>> EXEC_NODES =
- new HashSet>>() {
+ new HashSet<>() {
{
add(StreamExecCalc.class);
add(StreamExecChangelogNormalize.class);
@@ -164,6 +165,7 @@ private ExecNodeMetadataUtil() {
add(StreamExecWindowTableFunction.class);
add(StreamExecPythonCalc.class);
add(StreamExecAsyncCalc.class);
+ add(StreamExecProcessTableFunction.class);
add(StreamExecPythonCorrelate.class);
add(StreamExecPythonGroupAggregate.class);
add(StreamExecPythonGroupWindowAggregate.class);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
index 522b37319199da..a025eacf3be05b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
@@ -29,6 +29,7 @@
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.expressions.RexNodeExpression;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.functions.utils.TableSqlFunction;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
@@ -140,6 +141,10 @@ public static ClassLoader unwrapClassLoader(RelNode relNode) {
}
final RexCall call = (RexCall) rexNode;
if (!(call.getOperator() instanceof BridgingSqlFunction)) {
+ // legacy
+ if (call.getOperator() instanceof TableSqlFunction) {
+ return ((TableSqlFunction) call.getOperator()).udtf();
+ }
return null;
}
return ((BridgingSqlFunction) call.getOperator()).getDefinition();
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
new file mode 100644
index 00000000000000..44bf210b9d2039
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.TemporalTableFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.metadata.RelColumnMapping;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Subclass of {@link TableFunctionScan} that is a relational expression which calls a {@link
+ * FunctionKind#TABLE} or {@link FunctionKind#PROCESS_TABLE} in Flink.
+ */
+@Internal
+public class FlinkLogicalTableFunctionScan extends TableFunctionScan implements FlinkLogicalRel {
+
+ public static Converter CONVERTER =
+ new Converter(
+ ConverterRule.Config.INSTANCE.withConversion(
+ LogicalTableFunctionScan.class,
+ Convention.NONE,
+ FlinkConventions.LOGICAL(),
+ "FlinkLogicalTableFunctionScanConverter"));
+
+ public FlinkLogicalTableFunctionScan(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ List inputs,
+ RexNode rexCall,
+ @Nullable Type elementType,
+ RelDataType rowType,
+ @Nullable Set columnMappings) {
+ super(cluster, traitSet, inputs, rexCall, elementType, rowType, columnMappings);
+ }
+
+ @Override
+ public TableFunctionScan copy(
+ RelTraitSet traitSet,
+ List inputs,
+ RexNode rexCall,
+ @Nullable Type elementType,
+ RelDataType rowType,
+ @Nullable Set columnMappings) {
+ return new FlinkLogicalTableFunctionScan(
+ getCluster(), traitSet, inputs, rexCall, elementType, rowType, columnMappings);
+ }
+
+ @Internal
+ public static class Converter extends ConverterRule {
+
+ protected Converter(Config config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final LogicalTableFunctionScan functionScan = call.rel(0);
+ final FunctionDefinition functionDefinition =
+ ShortcutUtils.unwrapFunctionDefinition(functionScan.getCall());
+ if (functionDefinition == null) {
+ // For Calcite stack functions
+ return true;
+ }
+ final boolean isTableFunction =
+ functionDefinition.getKind() == FunctionKind.TABLE
+ || functionDefinition.getKind() == FunctionKind.PROCESS_TABLE;
+ return isTableFunction && !(functionDefinition instanceof TemporalTableFunction);
+ }
+
+ @Override
+ public @Nullable RelNode convert(RelNode rel) {
+ final LogicalTableFunctionScan functionScan = (LogicalTableFunctionScan) rel;
+ final RelTraitSet traitSet =
+ rel.getTraitSet().replace(FlinkConventions.LOGICAL()).simplify();
+ final List newInputs =
+ functionScan.getInputs().stream()
+ .map(input -> RelOptRule.convert(input, FlinkConventions.LOGICAL()))
+ .collect(Collectors.toList());
+ final RexCall rexCall = (RexCall) functionScan.getCall();
+ return new FlinkLogicalTableFunctionScan(
+ functionScan.getCluster(),
+ traitSet,
+ newInputs,
+ rexCall,
+ functionScan.getElementType(),
+ functionScan.getRowType(),
+ functionScan.getColumnMappings());
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
deleted file mode 100644
index ca664d7b63e685..00000000000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.nodes.logical
-
-import org.apache.flink.table.functions.TemporalTableFunction
-import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
-import org.apache.flink.table.planner.functions.utils.TableSqlFunction
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.convert.ConverterRule.Config
-import org.apache.calcite.rel.core.TableFunctionScan
-import org.apache.calcite.rel.logical.LogicalTableFunctionScan
-import org.apache.calcite.rel.metadata.RelColumnMapping
-import org.apache.calcite.rex.{RexCall, RexNode}
-
-import java.lang.reflect.Type
-import java.util
-
-import scala.collection.JavaConversions._
-
-/**
- * Sub-class of [[TableFunctionScan]] that is a relational expression which calls a table-valued
- * function in Flink.
- */
-class FlinkLogicalTableFunctionScan(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- inputs: util.List[RelNode],
- rexCall: RexNode,
- elementType: Type,
- rowType: RelDataType,
- columnMappings: util.Set[RelColumnMapping])
- extends TableFunctionScan(
- cluster,
- traitSet,
- inputs,
- rexCall,
- elementType,
- rowType,
- columnMappings)
- with FlinkLogicalRel {
-
- override def copy(
- traitSet: RelTraitSet,
- inputs: util.List[RelNode],
- rexCall: RexNode,
- elementType: Type,
- rowType: RelDataType,
- columnMappings: util.Set[RelColumnMapping]): TableFunctionScan = {
-
- new FlinkLogicalTableFunctionScan(
- cluster,
- traitSet,
- inputs,
- rexCall,
- elementType,
- rowType,
- columnMappings)
- }
-
-}
-
-class FlinkLogicalTableFunctionScanConverter(config: Config) extends ConverterRule(config) {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val logicalTableFunction: LogicalTableFunctionScan = call.rel(0)
-
- !isTemporalTableFunctionCall(logicalTableFunction)
- }
-
- private def isTemporalTableFunctionCall(
- logicalTableFunction: LogicalTableFunctionScan): Boolean = {
-
- if (!logicalTableFunction.getCall.isInstanceOf[RexCall]) {
- return false
- }
- val rexCall = logicalTableFunction.getCall.asInstanceOf[RexCall]
- val functionDefinition = rexCall.getOperator match {
- case tsf: TableSqlFunction => tsf.udtf
- case bsf: BridgingSqlFunction => bsf.getDefinition
- case _ => return false
- }
- functionDefinition.isInstanceOf[TemporalTableFunction]
- }
-
- def convert(rel: RelNode): RelNode = {
- val scan = rel.asInstanceOf[LogicalTableFunctionScan]
- val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
- val newInputs = scan.getInputs.map(input => RelOptRule.convert(input, FlinkConventions.LOGICAL))
- val rexCall = scan.getCall.asInstanceOf[RexCall];
- val builder = rel.getCluster.getRexBuilder
- // When rexCall uses NamedArguments, RexCall is not inferred with the correct type.
- // We just use the type of scan as the type of RexCall.
- val newCall = rexCall.clone(rel.getRowType, rexCall.getOperands)
-
- new FlinkLogicalTableFunctionScan(
- scan.getCluster,
- traitSet,
- newInputs,
- newCall,
- scan.getElementType,
- scan.getRowType,
- scan.getColumnMappings
- )
- }
-
-}
-
-object FlinkLogicalTableFunctionScan {
- val CONVERTER = new FlinkLogicalTableFunctionScanConverter(
- Config.INSTANCE.withConversion(
- classOf[LogicalTableFunctionScan],
- Convention.NONE,
- FlinkConventions.LOGICAL,
- "FlinkLogicalTableFunctionScanConverter"))
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
new file mode 100644
index 00000000000000..8910400b162d13
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.inference.StaticArgument;
+import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.table.types.inference.SystemTypeInference;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * {@link StreamPhysicalRel} node for {@link ProcessTableFunction}.
+ *
+ * A process table function (PTF) maps zero, one, or multiple tables to zero, one, or multiple
+ * rows. PTFs enable implementing user-defined operators that can be as feature-rich as built-in
+ * operations. PTFs have access to Flink's managed state, event-time and timer services, underlying
+ * table changelogs, and can take multiple ordered and/or partitioned tables to produce a new table.
+ */
+public class StreamPhysicalProcessTableFunction extends AbstractRelNode
+ implements StreamPhysicalRel {
+
+ private final FlinkLogicalTableFunctionScan scan;
+ private final String uid;
+
+ private List inputs;
+
+ public StreamPhysicalProcessTableFunction(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ List inputs,
+ FlinkLogicalTableFunctionScan scan,
+ RelDataType rowType) {
+ super(cluster, traitSet);
+ this.inputs = inputs;
+ this.rowType = rowType;
+ this.scan = scan;
+ this.uid = deriveUniqueIdentifier(scan);
+ }
+
+ public StreamPhysicalProcessTableFunction(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode input,
+ FlinkLogicalTableFunctionScan scan,
+ RelDataType rowType) {
+ this(cluster, traitSet, List.of(input), scan, rowType);
+ }
+
+ @Override
+ public boolean requireWatermark() {
+ // Even if there is no time attribute in the inputs, PTFs can work with event-time by taking
+ // the watermark value as timestamp.
+ return true;
+ }
+
+ @Override
+ public List getInputs() {
+ return inputs;
+ }
+
+ @Override
+ public void replaceInput(int ordinalInParent, RelNode p) {
+ final List newInputs = new ArrayList<>(inputs);
+ newInputs.set(ordinalInParent, p);
+ inputs = List.copyOf(newInputs);
+ recomputeDigest();
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List inputs) {
+ return new StreamPhysicalProcessTableFunction(
+ getCluster(), traitSet, inputs, scan, getRowType());
+ }
+
+ @Override
+ public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ final double elementRate = 100.0d * getInputs().size();
+ return planner.getCostFactory().makeCost(elementRate, elementRate, 0);
+ }
+
+ @Override
+ public ExecNode> translateToExecNode() {
+ final List inputChangelogModes =
+ getInputs().stream()
+ .map(StreamPhysicalRel.class::cast)
+ .map(ChangelogPlanUtils::getChangelogMode)
+ .map(JavaScalaConversionUtil::toJava)
+ .map(optional -> optional.orElseThrow(IllegalStateException::new))
+ .collect(Collectors.toList());
+ return new StreamExecProcessTableFunction(
+ unwrapTableConfig(this),
+ getInputs().stream().map(i -> InputProperty.DEFAULT).collect(Collectors.toList()),
+ FlinkTypeFactory.toLogicalRowType(rowType),
+ getRelDetailedDescription(),
+ uid,
+ (RexCall) scan.getCall(),
+ inputChangelogModes);
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ super.explainTerms(pw);
+ for (Ord ord : Ord.zip(inputs)) {
+ pw.input("input#" + ord.i, ord.e);
+ }
+ return pw.item("invocation", scan.getCall())
+ .item("uid", uid)
+ .item("select", String.join(",", getRowType().getFieldNames()))
+ .item("rowType", getRowType());
+ }
+
+ @Override
+ protected RelDataType deriveRowType() {
+ return rowType;
+ }
+
+ public List getProvidedInputArgs() {
+ final RexCall call = (RexCall) scan.getCall();
+ final List operands = call.getOperands();
+ final BridgingSqlFunction.WithTableFunction function =
+ (BridgingSqlFunction.WithTableFunction) call.getOperator();
+ final List declaredArgs =
+ function.getTypeInference()
+ .getStaticArguments()
+ .orElseThrow(IllegalStateException::new);
+ // This logic filters out optional tables for which an input is missing. It returns tables
+ // in the same order as provided inputs of this RelNode.
+ return Ord.zip(declaredArgs).stream()
+ .filter(arg -> arg.e.is(StaticArgumentTrait.TABLE))
+ .filter(arg -> operands.get(arg.i) instanceof RexTableArgCall)
+ .map(arg -> arg.e)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * An important part of {@link ProcessTableFunction} is the mandatory unique identifier. Even if
+ * the PTF has no state entries, state or timers might be added later. So a PTF should serve as
+ * an identifiable black box for the optimizer. UIDs ensure that.
+ *
+ * @see SystemTypeInference
+ */
+ private static String deriveUniqueIdentifier(FlinkLogicalTableFunctionScan scan) {
+ final RexCall rexCall = (RexCall) scan.getCall();
+ final BridgingSqlFunction.WithTableFunction function =
+ (BridgingSqlFunction.WithTableFunction) rexCall.getOperator();
+ final ContextResolvedFunction resolvedFunction = function.getResolvedFunction();
+ final List operands = rexCall.getOperands();
+ // Type inference ensures that uid is always added at the end
+ final RexNode uidRexNode = operands.get(operands.size() - 1);
+ if (uidRexNode.getKind() == SqlKind.DEFAULT) {
+ final String uid =
+ resolvedFunction
+ .getIdentifier()
+ .map(FunctionIdentifier::getFunctionName)
+ .orElse("");
+ if (!SystemTypeInference.isValidUidForProcessTableFunction(uid)) {
+ throw new ValidationException(
+ String.format(
+ "Could not derive a unique identifier for process table function '%s'. "
+ + "The function's name does not qualify for a UID. Please provide "
+ + "a custom identifier using the implicit `uid` argument. "
+ + "For example: myFunction(..., uid => 'my-id')",
+ resolvedFunction.asSummaryString()));
+ }
+ return uid;
+ }
+ // Otherwise UID should be correct as it has been checked by SystemTypeInference.
+ return RexLiteral.stringValue(uidRexNode);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java
new file mode 100644
index 00000000000000..c2fbb1363b3c14
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.TableCharacteristic;
+import org.apache.calcite.sql.TableCharacteristic.Semantics;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Rule to convert a {@link FlinkLogicalTableFunctionScan} with table arguments into a {@link
+ * StreamPhysicalProcessTableFunction}.
+ */
+public class StreamPhysicalProcessTableFunctionRule extends ConverterRule {
+
+ public static final StreamPhysicalProcessTableFunctionRule INSTANCE =
+ new StreamPhysicalProcessTableFunctionRule(
+ Config.INSTANCE.withConversion(
+ FlinkLogicalTableFunctionScan.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.STREAM_PHYSICAL(),
+ "StreamPhysicalProcessTableFunctionRule"));
+
+ private StreamPhysicalProcessTableFunctionRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final FlinkLogicalTableFunctionScan scan = call.rel(0);
+ if (scan.getInputs().isEmpty()) {
+ // Let StreamPhysicalConstantTableFunctionScanRule take over
+ return false;
+ }
+ final RexCall rexCall = (RexCall) scan.getCall();
+ final FunctionDefinition definition = ShortcutUtils.unwrapFunctionDefinition(rexCall);
+ return definition != null && definition.getKind() == FunctionKind.PROCESS_TABLE;
+ }
+
+ @Override
+ public @Nullable RelNode convert(RelNode rel) {
+ final FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) rel;
+ final RexCall rexCall = (RexCall) scan.getCall();
+ final BridgingSqlFunction.WithTableFunction function =
+ (BridgingSqlFunction.WithTableFunction) rexCall.getOperator();
+ final List operands = rexCall.getOperands();
+ final List newInputs =
+ applyDistributionOnInputs(function, operands, rel.getInputs());
+ final RelTraitSet providedTraitSet =
+ rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
+ return new StreamPhysicalProcessTableFunction(
+ scan.getCluster(), providedTraitSet, newInputs, scan, scan.getRowType());
+ }
+
+ private static List applyDistributionOnInputs(
+ BridgingSqlFunction.WithTableFunction function,
+ List operands,
+ List inputs) {
+ return Ord.zip(operands).stream()
+ .filter(operand -> operand.e instanceof RexTableArgCall)
+ .map(
+ tableOperand -> {
+ final int pos = tableOperand.i;
+ final RexTableArgCall tableArgCall = (RexTableArgCall) tableOperand.e;
+ final TableCharacteristic tableCharacteristic =
+ function.tableCharacteristic(pos);
+ assert tableCharacteristic != null;
+ return applyDistributionOnInput(
+ tableArgCall,
+ tableCharacteristic,
+ inputs.get(tableArgCall.getInputIndex()));
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static RelNode applyDistributionOnInput(
+ RexTableArgCall tableOperand, TableCharacteristic tableCharacteristic, RelNode input) {
+ final FlinkRelDistribution requiredDistribution =
+ deriveDistribution(tableOperand, tableCharacteristic);
+ final RelTraitSet requiredTraitSet =
+ input.getCluster()
+ .getPlanner()
+ .emptyTraitSet()
+ .replace(requiredDistribution)
+ .replace(FlinkConventions.STREAM_PHYSICAL());
+ return RelOptRule.convert(input, requiredTraitSet);
+ }
+
+ private static FlinkRelDistribution deriveDistribution(
+ RexTableArgCall tableOperand, TableCharacteristic tableCharacteristic) {
+ if (tableCharacteristic.semantics == Semantics.SET) {
+ final int[] partitionKeys = tableOperand.getPartitionKeys();
+ if (partitionKeys.length == 0) {
+ return FlinkRelDistribution.SINGLETON();
+ }
+ return FlinkRelDistribution.hash(partitionKeys, true);
+ }
+ return FlinkRelDistribution.DEFAULT();
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 681dee6e7660cf..e6d2892eab6415 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -31,6 +31,7 @@ import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFast
import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.table.runtime.operators.join.FlinkJoinType
+import org.apache.flink.table.types.inference.{StaticArgument, StaticArgumentTrait}
import org.apache.flink.types.RowKind
import org.apache.calcite.rel.RelNode
@@ -329,6 +330,29 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val providedTrait = new ModifyKindSetTrait(scan.intermediateTable.modifyKindSet)
createNewNode(scan, List(), providedTrait, requiredTrait, requester)
+ case process: StreamPhysicalProcessTableFunction =>
+ // Accepted changes depend on input argument declaration
+ val requiredChildrenTraits = process.getProvidedInputArgs
+ .map(
+ arg =>
+ if (arg.is(StaticArgumentTrait.SUPPORT_UPDATES)) {
+ ModifyKindSetTrait.ALL_CHANGES
+ } else {
+ ModifyKindSetTrait.INSERT_ONLY
+ })
+ .toList
+
+ val children = if (requiredChildrenTraits.isEmpty) {
+ // Constant function has a single StreamPhysicalValues input
+ visitChildren(process, ModifyKindSetTrait.INSERT_ONLY)
+ } else {
+ visitChildren(process, requiredChildrenTraits)
+ }
+
+ // Currently, PTFs will only output insert-only
+ val providedTrait = ModifyKindSetTrait.INSERT_ONLY
+ createNewNode(process, children, providedTrait, requiredTrait, requester)
+
case _ =>
throw new UnsupportedOperationException(
s"Unsupported visit for ${rel.getClass.getSimpleName}")
@@ -350,6 +374,16 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
newChildren.toList
}
+ private def visitChildren(
+ parent: StreamPhysicalRel,
+ requiredChildrenTraits: List[ModifyKindSetTrait]): List[StreamPhysicalRel] = {
+ val requester = getNodeName(parent)
+ val newChildren = for (i <- 0 until parent.getInputs.size()) yield {
+ visitChild(parent, i, requiredChildrenTraits(i), requester)
+ }
+ newChildren.toList
+ }
+
private def visitChild(
parent: StreamPhysicalRel,
childOrdinal: Int,
@@ -676,6 +710,20 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(rel, Some(List()), providedTrait)
}
+ case process: StreamPhysicalProcessTableFunction =>
+ // ProcessTableFunction currently only consumes retract or insert-only
+ val children = process.getInputs.map {
+ case child: StreamPhysicalRel =>
+ val childModifyKindSet = getModifyKindSet(child)
+ val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
+ UpdateKindTrait.NONE
+ } else {
+ UpdateKindTrait.BEFORE_AND_AFTER
+ }
+ this.visit(child, requiredChildTrait)
+ }.toList
+ createNewNode(rel, Some(children.flatten), UpdateKindTrait.NONE)
+
case _ =>
throw new UnsupportedOperationException(
s"Unsupported visit for ${rel.getClass.getSimpleName}")
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index fb1fec0a1b6d9a..b5d30afa20fc59 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.rules
import org.apache.flink.table.planner.plan.nodes.logical._
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunctionRule
import org.apache.flink.table.planner.plan.rules.logical._
import org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule
import org.apache.flink.table.planner.plan.rules.physical.stream._
@@ -465,6 +466,8 @@ object FlinkStreamRuleSets {
ExpandWindowTableFunctionTransposeRule.INSTANCE,
StreamPhysicalWindowRankRule.INSTANCE,
StreamPhysicalWindowDeduplicateRule.INSTANCE,
+ // process table function
+ StreamPhysicalProcessTableFunctionRule.INSTANCE,
// join
StreamPhysicalJoinRule.INSTANCE,
StreamPhysicalIntervalJoinRule.INSTANCE,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
index 489f10a6123c47..e4baf0e0375ac6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
@@ -35,6 +35,7 @@
import org.apache.flink.table.module.Module;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.calcite.RexTableArgCall;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
@@ -684,7 +685,17 @@ private static Stream testRexNodeSerde() {
FlinkSqlOperatorTable.HASH_CODE,
rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1)),
rexBuilder.makePatternFieldRef(
- "test", FACTORY.createSqlType(SqlTypeName.INTEGER), 0));
+ "test", FACTORY.createSqlType(SqlTypeName.INTEGER), 0),
+ new RexTableArgCall(
+ FACTORY.createStructType(
+ StructKind.PEEK_FIELDS_NO_EXPAND,
+ Arrays.asList(
+ FACTORY.createSqlType(SqlTypeName.VARCHAR),
+ FACTORY.createSqlType(SqlTypeName.INTEGER)),
+ Arrays.asList("f1", "f2")),
+ 0,
+ new int[] {1},
+ new int[] {0}));
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
index 8aeee6ca5c0b7e..96b4e514775ca6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
@@ -46,6 +46,7 @@
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY;
import static org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
+import static org.apache.flink.table.annotation.ArgumentTrait.SUPPORT_UPDATES;
import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_ROW;
import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_SET;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -58,9 +59,18 @@ public class ProcessTableFunctionTest extends TableTestBase {
@BeforeEach
void setup() {
util = streamTestUtil(TableConfig.getDefault());
- util.tableEnv().executeSql("CREATE VIEW t1 AS SELECT 'Bob' AS name, 12 AS score");
- util.tableEnv().executeSql("CREATE VIEW t2 AS SELECT 'Bob' AS name, 12 AS different");
- util.tableEnv().executeSql("CREATE VIEW t3 AS SELECT 'Bob' AS name, TRUE AS isValid");
+ util.tableEnv()
+ .executeSql(
+ "CREATE VIEW t AS SELECT * FROM (VALUES ('Bob', 12), ('Alice', 42)) AS T(name, score)");
+ util.tableEnv()
+ .executeSql("CREATE VIEW t_name_diff AS SELECT 'Bob' AS name, 12 AS different");
+ util.tableEnv()
+ .executeSql("CREATE VIEW t_type_diff AS SELECT 'Bob' AS name, TRUE AS isValid");
+ util.tableEnv()
+ .executeSql("CREATE VIEW t_updating AS SELECT name, COUNT(*) FROM t GROUP BY name");
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE t_sink (name STRING, data STRING) WITH ('connector' = 'blackhole')");
}
@Test
@@ -86,13 +96,13 @@ void testUnknownScalarArg() {
@Test
void testTableAsRow() {
util.addTemporarySystemFunction("f", TableAsRowFunction.class);
- assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+ util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)");
}
@Test
void testTypedTableAsRow() {
util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
- assertReachesOptimizer("SELECT * FROM f(u => TABLE t1, i => 1)");
+ util.verifyRelPlan("SELECT * FROM f(u => TABLE t, i => 1)");
}
@Test
@@ -100,25 +110,25 @@ void testTypedTableAsRowIgnoringColumnNames() {
util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
// function expects
// but table is
- assertReachesOptimizer("SELECT * FROM f(u => TABLE t2, i => 1)");
+ util.verifyRelPlan("SELECT * FROM f(u => TABLE t_name_diff, i => 1)");
}
@Test
void testTableAsSet() {
util.addTemporarySystemFunction("f", TableAsSetFunction.class);
- assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)");
+ util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)");
}
@Test
void testTableAsSetOptionalPartitionBy() {
util.addTemporarySystemFunction("f", TableAsSetOptionalPartitionFunction.class);
- assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+ util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)");
}
@Test
void testTypedTableAsSet() {
util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class);
- assertReachesOptimizer("SELECT * FROM f(u => TABLE t1 PARTITION BY name, i => 1)");
+ util.verifyRelPlan("SELECT * FROM f(u => TABLE t PARTITION BY name, i => 1)");
}
@Test
@@ -131,27 +141,83 @@ void testEmptyArgs() {
void testPojoArgs() {
util.addTemporarySystemFunction("f", PojoArgsFunction.class);
util.addTemporarySystemFunction("pojoCreator", PojoCreatingFunction.class);
- assertReachesOptimizer(
- "SELECT * FROM f(input => TABLE t1, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')");
+ util.verifyRelPlan(
+ "SELECT * FROM f(input => TABLE t, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')");
}
@Test
void testTableAsSetPassThroughColumns() {
util.addTemporarySystemFunction("f", TableAsSetPassThroughFunction.class);
- assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)");
+ util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)");
}
@Test
void testTableAsRowPassThroughColumns() {
util.addTemporarySystemFunction("f", TableAsRowPassThroughFunction.class);
- assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
+ util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)");
+ }
+
+ @Test
+ void testUpdatingInput() {
+ util.addTemporarySystemFunction("f", UpdatingArgFunction.class);
+ util.verifyRelPlan("SELECT * FROM f(r => TABLE t_updating PARTITION BY name, i => 1)");
+ }
+
+ @Test
+ void testMissingUid() {
+ // Function name contains special characters and can thus not be used as UID
+ util.addTemporarySystemFunction("f*", ScalarArgsFunction.class);
+ assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM `f*`(42, true)"))
+ .satisfies(
+ anyCauseMatches(
+ "Could not derive a unique identifier for process table function 'f*'. "
+ + "The function's name does not qualify for a UID. "
+ + "Please provide a custom identifier using the implicit `uid` argument. "
+ + "For example: myFunction(..., uid => 'my-id')"));
+ }
+
+ @Test
+ void testUidPipelineSplitIntoTwoFunctions() {
+ util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+ util.verifyExecPlan(
+ util.tableEnv()
+ .createStatementSet()
+ .addInsertSql(
+ "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'a')")
+ .addInsertSql(
+ "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'b')"));
+ }
+
+ @Test
+ void testUidPipelineMergeIntoOneFunction() {
+ util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+ util.verifyExecPlan(
+ util.tableEnv()
+ .createStatementSet()
+ .addInsertSql(
+ "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same')")
+ .addInsertSql(
+ "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same')"));
+ }
+
+ @Test
+ void testUidPipelineMergeWithFanOut() {
+ util.addTemporarySystemFunction("f", TableAsSetFunction.class);
+
+ util.verifyExecPlan(
+ util.tableEnv()
+ .createStatementSet()
+ .addInsertSql(
+ "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same') WHERE name = 'Bob'")
+ .addInsertSql(
+ "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same') WHERE name = 'Alice'"));
}
@ParameterizedTest
@MethodSource("errorSpecs")
void testErrorBehavior(ErrorSpec spec) {
util.addTemporarySystemFunction("f", spec.functionClass);
- assertThatThrownBy(() -> util.verifyRelPlan(spec.sql))
+ assertThatThrownBy(() -> util.verifyExecPlan(spec.sql))
.satisfies(anyCauseMatches(spec.errorMessage));
}
@@ -162,31 +228,31 @@ private static Stream errorSpecs() {
ScalarArgsFunction.class,
"SELECT * FROM f(uid => '%', i => 1, b => true)",
"Invalid unique identifier for process table function. "
- + "The 'uid' argument must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. "
+ + "The `uid` argument must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. "
+ "But found: %"),
ErrorSpec.of(
"typed table as row with invalid input",
TypedTableAsRowFunction.class,
// function expects
- "SELECT * FROM f(u => TABLE t3, i => 1)",
+ "SELECT * FROM f(u => TABLE t_type_diff, i => 1)",
"No match found for function signature "
+ "f(, , )"),
ErrorSpec.of(
"table as set with missing partition by",
TableAsSetFunction.class,
- "SELECT * FROM f(r => TABLE t1, i => 1)",
+ "SELECT * FROM f(r => TABLE t, i => 1)",
"Table argument 'r' requires a PARTITION BY clause for parallel processing."),
ErrorSpec.of(
"typed table as set with invalid input",
TypedTableAsSetFunction.class,
// function expects
- "SELECT * FROM f(u => TABLE t3 PARTITION BY name, i => 1)",
+ "SELECT * FROM f(u => TABLE t_type_diff PARTITION BY name, i => 1)",
"No match found for function signature "
+ "f(, , )"),
ErrorSpec.of(
"table function instead of process table function",
NoProcessTableFunction.class,
- "SELECT * FROM f(r => TABLE t1)",
+ "SELECT * FROM f(r => TABLE t)",
"Only scalar arguments are supported at this location. "
+ "But argument 'r' declared the following traits: [TABLE, TABLE_AS_ROW]"),
ErrorSpec.of(
@@ -197,7 +263,7 @@ private static Stream errorSpecs() {
ErrorSpec.of(
"multiple table args",
MultiTableFunction.class,
- "SELECT * FROM f(r1 => TABLE t1, r2 => TABLE t1)",
+ "SELECT * FROM f(r1 => TABLE t, r2 => TABLE t)",
"Currently, only signatures with at most one table argument are supported."),
ErrorSpec.of(
"row instead of table",
@@ -207,25 +273,36 @@ private static Stream errorSpecs() {
ErrorSpec.of(
"table as row partition by",
TableAsRowFunction.class,
- "SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)",
+ "SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)",
"Only tables with set semantics may be partitioned. "
+ "Invalid PARTITION BY clause in the 0-th operand of table function 'f'"),
ErrorSpec.of(
"invalid partition by clause",
TableAsSetFunction.class,
- "SELECT * FROM f(r => TABLE t1 PARTITION BY invalid, i => 1)",
+ "SELECT * FROM f(r => TABLE t PARTITION BY invalid, i => 1)",
"Invalid column 'invalid' for PARTITION BY clause. Available columns are: [name, score]"),
ErrorSpec.of(
"unsupported order by",
TableAsSetFunction.class,
- "SELECT * FROM f(r => TABLE t1 PARTITION BY name ORDER BY score, i => 1)",
- "ORDER BY clause is currently not supported."));
- }
-
- private void assertReachesOptimizer(String sql) {
- assertThatThrownBy(() -> util.verifyRelPlan(sql))
- .hasMessageContaining(
- "This exception indicates that the query uses an unsupported SQL feature.");
+ "SELECT * FROM f(r => TABLE t PARTITION BY name ORDER BY score, i => 1)",
+ "ORDER BY clause is currently not supported."),
+ ErrorSpec.of(
+ "updates into insert-only table arg",
+ TableAsSetFunction.class,
+ "SELECT * FROM f(r => TABLE t_updating PARTITION BY name, i => 1)",
+ "StreamPhysicalProcessTableFunction doesn't support consuming update changes"),
+ ErrorSpec.of(
+ "updates into POJO table arg",
+ InvalidTypedUpdatingArgFunction.class,
+ "SELECT * FROM f(r => TABLE t_updating, i => 1)",
+ "Table arguments that support updates must use a row type."),
+ ErrorSpec.of(
+ "uid conflict",
+ TableAsSetFunction.class,
+ "SELECT * FROM f(r => TABLE t PARTITION BY name, i => 42, uid => 'same') "
+ + "UNION ALL SELECT * FROM f(r => TABLE t PARTITION BY name, i => 999, uid => 'same')",
+ "Duplicate unique identifier 'same' detected among process table functions. "
+ + "Make sure that all PTF calls have an identifier defined that is globally unique."));
}
/** Testing function. */
@@ -252,6 +329,18 @@ public static class TableAsSetFunction extends ProcessTableFunction {
public void eval(@ArgumentHint(TABLE_AS_SET) Row r, Integer i) {}
}
+ /** Testing function. */
+ public static class UpdatingArgFunction extends ProcessTableFunction {
+ @SuppressWarnings("unused")
+ public void eval(@ArgumentHint({TABLE_AS_SET, SUPPORT_UPDATES}) Row r, Integer i) {}
+ }
+
+ /** Testing function. */
+ public static class InvalidTypedUpdatingArgFunction extends ProcessTableFunction {
+ @SuppressWarnings("unused")
+ public void eval(@ArgumentHint({TABLE_AS_ROW, SUPPORT_UPDATES}) User u, Integer i) {}
+ }
+
/** Testing function. */
public static class MultiTableFunction extends ProcessTableFunction {
@SuppressWarnings("unused")
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
index 5f4f90d18fd223..3ab9cb43979f62 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
@@ -16,6 +16,93 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
+
+
+ 'my-ptf')]]>
+
+
+
+
+
+
+
+
+
+
+ TABLE t, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')]]>
+
+
+
+
+
+
+
+
+
+
+ 1, b => true)]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
'my-uid', i => 1, b => true)]]>
@@ -28,48 +115,273 @@ LogicalProject(EXPR$0=[$0])
-
+
- 'my-ptf')]]>
+ TABLE t, i => 1)]]>
-
+
- 1, b => true, invalid => 'invalid')]]>
+ TABLE t, i => 1)]]>
+
+
+
+
+
+
+
+
+
+
+ TABLE t PARTITION BY name, i => 1)]]>
+
+
+
+
+
+
+
+
+
+
+ TABLE t PARTITION BY name, i => 1)]]>
+
+
+
+
+
+
+
+
+
+
+ TABLE t, i => 1)]]>
-
+
- 1, b => true)]]>
+ TABLE t_name_diff, i => 1)]]>
+
+
+
+
+
+
+
+
+
+
+ TABLE t PARTITION BY name, i => 1)]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ TABLE t_updating PARTITION BY name, i => 1)]]>
+
+
+
+
+
+
+
+
+
+
+ TABLE t, i => 1)]]>
+
+
+
+
+
+
+
+
+
+
+ 1, b => true, invalid => 'invalid')]]>