From 706be4366eedbf2658a502a225d8fc12a6adb926 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Thu, 9 Jan 2025 12:13:09 +0100 Subject: [PATCH] Feedback addressed --- .../types/inference/SystemTypeInference.java | 72 +++--- .../stream/sql/ProcessTableFunctionTest.java | 229 ++++++++++-------- 2 files changed, 154 insertions(+), 147 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java index 453b4210f11a30..ba89b5e79666d8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java @@ -63,35 +63,21 @@ public class SystemTypeInference { public static TypeInference of(FunctionKind functionKind, TypeInference origin) { final TypeInference.Builder builder = TypeInference.newBuilder(); - final List defaultArgs = - applyDefaultArgs(builder, functionKind, origin.getStaticArguments().orElse(null)); - builder.inputTypeStrategy(origin.getInputTypeStrategy()); + final List systemArgs = + deriveSystemArgs(functionKind, origin.getStaticArguments().orElse(null)); + if (systemArgs != null) { + builder.staticArguments(systemArgs); + } + builder.inputTypeStrategy( + deriveSystemInputStrategy(functionKind, systemArgs, origin.getInputTypeStrategy())); builder.stateTypeStrategies(origin.getStateTypeStrategies()); - builder.outputTypeStrategy(origin.getOutputTypeStrategy()); - - final List systemArgs = applySystemArgs(builder, functionKind, defaultArgs); - applySystemInputStrategy(builder, functionKind, systemArgs, origin); - applySystemOutputStrategy(builder, functionKind, origin); - + builder.outputTypeStrategy( + deriveSystemOutputStrategy(functionKind, origin.getOutputTypeStrategy())); return builder.build(); } // -------------------------------------------------------------------------------------------- - private static @Nullable List applyDefaultArgs( - TypeInference.Builder builder, - FunctionKind functionKind, - @Nullable List defaultArgs) { - if (defaultArgs == null) { - return null; - } - if (functionKind != FunctionKind.PROCESS_TABLE) { - checkScalarArgsOnly(defaultArgs); - } - builder.staticArguments(defaultArgs); - return defaultArgs; - } - private static void checkScalarArgsOnly(List defaultArgs) { defaultArgs.forEach( arg -> { @@ -105,23 +91,23 @@ private static void checkScalarArgsOnly(List defaultArgs) { }); } - private static @Nullable List applySystemArgs( - TypeInference.Builder builder, - FunctionKind functionKind, - @Nullable List defaultArgs) { + private static @Nullable List deriveSystemArgs( + FunctionKind functionKind, @Nullable List declaredArgs) { if (functionKind != FunctionKind.PROCESS_TABLE) { - return defaultArgs; + if (declaredArgs != null) { + checkScalarArgsOnly(declaredArgs); + } + return declaredArgs; } - if (defaultArgs == null) { + if (declaredArgs == null) { throw new ValidationException( "Function requires a static signature that is not overloaded and doesn't contain varargs."); } - checkReservedArgs(defaultArgs); + checkReservedArgs(declaredArgs); - final List newStaticArgs = new ArrayList<>(defaultArgs); + final List newStaticArgs = new ArrayList<>(declaredArgs); newStaticArgs.addAll(PROCESS_TABLE_FUNCTION_SYSTEM_ARGS); - builder.staticArguments(newStaticArgs); return newStaticArgs; } @@ -140,24 +126,22 @@ private static void checkReservedArgs(List staticArgs) { } } - private static void applySystemInputStrategy( - TypeInference.Builder builder, + private static InputTypeStrategy deriveSystemInputStrategy( FunctionKind functionKind, @Nullable List staticArgs, - TypeInference origin) { + InputTypeStrategy inputStrategy) { if (functionKind != FunctionKind.PROCESS_TABLE) { - return; + return inputStrategy; } - builder.inputTypeStrategy( - new SystemInputStrategy(staticArgs, origin.getInputTypeStrategy())); + return new SystemInputStrategy(staticArgs, inputStrategy); } - private static void applySystemOutputStrategy( - TypeInference.Builder builder, FunctionKind functionKind, TypeInference origin) { + private static TypeStrategy deriveSystemOutputStrategy( + FunctionKind functionKind, TypeStrategy outputStrategy) { if (functionKind != FunctionKind.TABLE && functionKind != FunctionKind.PROCESS_TABLE) { - return; + return outputStrategy; } - builder.outputTypeStrategy(new SystemOutputStrategy(origin.getOutputTypeStrategy())); + return new SystemOutputStrategy(outputStrategy); } private static class SystemOutputStrategy implements TypeStrategy { @@ -210,7 +194,9 @@ private SystemInputStrategy(List staticArgs, InputTypeStrategy o @Override public ArgumentCount getArgumentCount() { - // Static arguments declare the count + // Static arguments take precedence. Thus, the input strategy only serves as a + // validation layer. Since the count is already validated we don't need to validate it a + // second time. return InputTypeStrategies.WILDCARD.getArgumentCount(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java index 78e0f4bebc00e2..c5039da5730269 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.table.planner.plan.stream.sql; import org.apache.flink.table.annotation.ArgumentHint; @@ -8,6 +26,7 @@ import org.apache.flink.table.functions.ProcessTableFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.planner.utils.TableTestBase; import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.flink.table.types.inference.StaticArgument; @@ -17,10 +36,14 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.util.EnumSet; import java.util.Optional; +import java.util.stream.Stream; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY; import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_ROW; import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_SET; @@ -59,17 +82,6 @@ void testUnknownScalarArg() { util.verifyRelPlan("SELECT * FROM f(i => 1, b => true, invalid => 'invalid')"); } - @Test - void testInvalidUid() { - util.addTemporarySystemFunction("f", ScalarArgsFunction.class); - assertThatThrownBy( - () -> util.verifyRelPlan("SELECT * FROM f(uid => '%', i => 1, b => true)")) - .hasRootCauseMessage( - "Invalid unique identifier for process table function. " - + "The 'uid' argument must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. " - + "But found: %"); - } - @Test void testTableAsRow() { util.addTemporarySystemFunction("f", TableAsRowFunction.class); @@ -90,30 +102,12 @@ void testTypedTableAsRowIgnoringColumnNames() { assertReachesOptimizer("SELECT * FROM f(u => TABLE t2, i => 1)"); } - @Test - void testTypedTableAsRowWithInvalidInput() { - util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class); - // function expects - assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(u => TABLE t3, i => 1)")) - .hasMessageContaining( - "No match found for function signature " - + "f(, , )"); - } - @Test void testTableAsSet() { util.addTemporarySystemFunction("f", TableAsSetFunction.class); assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)"); } - @Test - void testTableAsSetWithInvalidPartitionBy() { - util.addTemporarySystemFunction("f", TableAsSetFunction.class); - assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(r => TABLE t1, i => 1)")) - .hasRootCauseMessage( - "Table argument 'r' requires a PARTITION BY clause for parallel processing."); - } - @Test void testTableAsSetOptionalPartitionBy() { util.addTemporarySystemFunction("f", TableAsSetOptionalPartitionFunction.class); @@ -126,19 +120,6 @@ void testTypedTableAsSet() { assertReachesOptimizer("SELECT * FROM f(u => TABLE t1 PARTITION BY name, i => 1)"); } - @Test - void testTypedTableAsSetWithInvalidInput() { - util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class); - // function expects - assertThatThrownBy( - () -> - util.verifyRelPlan( - "SELECT * FROM f(u => TABLE t3 PARTITION BY name, i => 1)")) - .hasMessageContaining( - "No match found for function signature " - + "f(, , )"); - } - @Test void testEmptyArgs() { util.addTemporarySystemFunction("f", EmptyArgFunction.class); @@ -153,71 +134,79 @@ void testPojoArgs() { "SELECT * FROM f(input => TABLE t1, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')"); } - @Test - void testInvalidTableFunction() { - util.addTemporarySystemFunction("f", NoProcessTableFunction.class); - assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(r => TABLE t1)")) - .hasRootCauseMessage( - "Only scalar arguments are supported at this location. " - + "But argument 'r' declared the following traits: [TABLE, TABLE_AS_ROW]"); + @ParameterizedTest + @MethodSource("errorSpecs") + void testErrorBehavior(ErrorSpec spec) { + util.addTemporarySystemFunction("f", spec.functionClass); + assertThatThrownBy(() -> util.verifyRelPlan(spec.sql)) + .satisfies(anyCauseMatches(spec.errorMessage)); } - @Test - void testReservedArg() { - util.addTemporarySystemFunction("f", ReservedArgFunction.class); - assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(uid => 'my-ptf')")) - .hasRootCauseMessage( - "Function signature must not declare system arguments. Reserved argument names are: [uid]"); - } - - @Test - void testInvalidMultiTable() { - util.addTemporarySystemFunction("f", MultiTableFunction.class); - assertThatThrownBy( - () -> util.verifyRelPlan("SELECT * FROM f(r1 => TABLE t1, r2 => TABLE t1)")) - .hasRootCauseMessage( - "Currently, only signatures with at most one table argument are supported."); - } - - @Test - void testInvalidRowInsteadOfTable() { - util.addTemporarySystemFunction("f", TableAsRowFunction.class); - assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM f(r => ROW(42), i => 1)")) - .hasRootCauseMessage( - "Invalid argument value. Argument 'r' expects a table to be passed."); - } - - @Test - void testInvalidSetSemantics() { - util.addTemporarySystemFunction("f", TableAsRowFunction.class); - assertThatThrownBy( - () -> - util.verifyRelPlan( - "SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)")) - .hasRootCauseMessage( + private static Stream errorSpecs() { + return Stream.of( + ErrorSpec.of( + "invalid uid", + ScalarArgsFunction.class, + "SELECT * FROM f(uid => '%', i => 1, b => true)", + "Invalid unique identifier for process table function. " + + "The 'uid' argument must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. " + + "But found: %"), + ErrorSpec.of( + "typed table as row with invalid input", + TypedTableAsRowFunction.class, + // function expects + "SELECT * FROM f(u => TABLE t3, i => 1)", + "No match found for function signature " + + "f(, , )"), + ErrorSpec.of( + "table as set with missing partition by", + TableAsSetFunction.class, + "SELECT * FROM f(r => TABLE t1, i => 1)", + "Table argument 'r' requires a PARTITION BY clause for parallel processing."), + ErrorSpec.of( + "typed table as set with invalid input", + TypedTableAsSetFunction.class, + // function expects + "SELECT * FROM f(u => TABLE t3 PARTITION BY name, i => 1)", + "No match found for function signature " + + "f(, , )"), + ErrorSpec.of( + "table function instead of process table function", + NoProcessTableFunction.class, + "SELECT * FROM f(r => TABLE t1)", + "Only scalar arguments are supported at this location. " + + "But argument 'r' declared the following traits: [TABLE, TABLE_AS_ROW]"), + ErrorSpec.of( + "reserved args", + ReservedArgFunction.class, + "SELECT * FROM f(uid => 'my-ptf')", + "Function signature must not declare system arguments. Reserved argument names are: [uid]"), + ErrorSpec.of( + "multiple table args", + MultiTableFunction.class, + "SELECT * FROM f(r1 => TABLE t1, r2 => TABLE t1)", + "Currently, only signatures with at most one table argument are supported."), + ErrorSpec.of( + "row instead of table", + TableAsRowFunction.class, + "SELECT * FROM f(r => ROW(42), i => 1)", + "Invalid argument value. Argument 'r' expects a table to be passed."), + ErrorSpec.of( + "table as row partition by", + TableAsRowFunction.class, + "SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)", "Only tables with set semantics may be partitioned. " - + "Invalid PARTITION BY clause in the 0-th operand of table function 'f'"); - } - - @Test - void testInvalidPartitionByClause() { - util.addTemporarySystemFunction("f", TableAsSetFunction.class); - assertThatThrownBy( - () -> - util.verifyRelPlan( - "SELECT * FROM f(r => TABLE t1 PARTITION BY invalid, i => 1)")) - .hasRootCauseMessage( - "Invalid column 'invalid' for PARTITION BY clause. Available columns are: [name, score]"); - } - - @Test - void testUnsupportedOrderByClause() { - util.addTemporarySystemFunction("f", TableAsSetFunction.class); - assertThatThrownBy( - () -> - util.verifyRelPlan( - "SELECT * FROM f(r => TABLE t1 PARTITION BY name ORDER BY score, i => 1)")) - .hasRootCauseMessage("ORDER BY clause is currently not supported."); + + "Invalid PARTITION BY clause in the 0-th operand of table function 'f'"), + ErrorSpec.of( + "invalid partition by clause", + TableAsSetFunction.class, + "SELECT * FROM f(r => TABLE t1 PARTITION BY invalid, i => 1)", + "Invalid column 'invalid' for PARTITION BY clause. Available columns are: [name, score]"), + ErrorSpec.of( + "unsupported order by", + TableAsSetFunction.class, + "SELECT * FROM f(r => TABLE t1 PARTITION BY name ORDER BY score, i => 1)", + "ORDER BY clause is currently not supported.")); } private void assertReachesOptimizer(String sql) { @@ -304,6 +293,7 @@ public void eval() {} /** Testing function. */ public static class PojoArgsFunction extends ProcessTableFunction { + @SuppressWarnings("unused") public void eval(@ArgumentHint(TABLE_AS_ROW) User input, User scalar) {} } @@ -323,4 +313,35 @@ public User(String s, Integer i) { this.i = i; } } + + private static class ErrorSpec { + private final String description; + private final Class functionClass; + private final String sql; + private final String errorMessage; + + private ErrorSpec( + String description, + Class functionClass, + String sql, + String errorMessage) { + this.description = description; + this.functionClass = functionClass; + this.sql = sql; + this.errorMessage = errorMessage; + } + + static ErrorSpec of( + String description, + Class functionClass, + String sql, + String errorMessage) { + return new ErrorSpec(description, functionClass, sql, errorMessage); + } + + @Override + public String toString() { + return description; + } + } }