diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentHint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentHint.java index 10671cca6aa7d..87a438c6eb52c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentHint.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentHint.java @@ -31,17 +31,23 @@ *
An {@code ArgumentHint} can be used to provide hints about the name, optionality, and data * type of argument. * - *
It combines the functionality of {@link FunctionHint#argumentNames()} and {@link DataTypeHint} - * annotations to conveniently group argument-related information together in function declarations. + *
{@code @ArgumentHint(name = "in1", type = @DataTypeHint("STRING"), isOptional = false)} is a + * scalar argument with the data type STRING, named "in1", and cannot be omitted when calling. * - *
{@code @ArgumentHint(name = "in1", type = @DataTypeHint("STRING"), isOptional = false} is an - * argument with the type String, named in1, and cannot be omitted when calling. + * @see FunctionHint */ @PublicEvolving @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER}) public @interface ArgumentHint { + /** + * The kind of the argument. + * + *
Only applies to {@code ProcessTableFunction}s (PTFs). Others can only take scalar values. + */ + ArgumentTrait[] value() default {ArgumentTrait.SCALAR}; + /** * The name of the argument. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java new file mode 100644 index 0000000000000..5f6f7bc6ea73e --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.annotation; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.types.inference.StaticArgumentTrait; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Declares traits for {@link ArgumentHint}. They enable basic validation by the framework. + * + *
Some traits have dependencies to other traits, which is why this enum reflects a hierarchy in + * which {@link #SCALAR}, {@link #TABLE_AS_ROW}, and {@link #TABLE_AS_SET} are the top-level roots. + */ +@PublicEvolving +public enum ArgumentTrait { + + /** + * An argument that accepts a scalar value. For example: f(1), f(true), f('Some string'). + * + *
It's the default if no {@link ArgumentHint} is provided. + */ + SCALAR(StaticArgumentTrait.SCALAR), + + /** + * An argument that accepts a table "as row" (i.e. with row semantics). This trait only applies + * to {@code ProcessTableFunction} (PTF). + * + *
For scalability, input tables are distributed into virtual processors. Each virtual + * processor executes a PTF instance and has access only to a share of the entire table. The + * argument declaration decides about the size of the share and co-location of data. + * + *
A table with row semantics assumes that there is no correlation between rows and each row + * can be processed independently. The framework is free in how to distribute rows among virtual + * processors and each virtual processor has access only to the currently processed row. + */ + TABLE_AS_ROW(StaticArgumentTrait.TABLE_AS_ROW), + + /** + * An argument that accepts a table "as set" (i.e. with set semantics). This trait only applies + * to {@code ProcessTableFunction} (PTF). + * + *
For scalability, input tables are distributed into virtual processors. Each virtual + * processor executes a PTF instance and has access only to a share of the entire table. The + * argument declaration decides about the size of the share and co-location of data. + * + *
A table with set semantics assumes that there is a correlation between rows. When calling
+ * the function, the PARTITION BY clause defines the columns for correlation. The framework
+ * ensures that all rows belonging to same set are co-located. A PTF instance is able to access
+ * all rows belonging to the same set. In other words: The virtual processor is scoped under a
+ * key context.
+ */
+ TABLE_AS_SET(StaticArgumentTrait.TABLE_AS_SET),
+
+ /**
+ * Defines that a PARTITION BY clause is optional for {@link #TABLE_AS_SET}. By default, it is
+ * mandatory for improving the parallel execution by distributing the table by key.
+ */
+ OPTIONAL_PARTITION_BY(StaticArgumentTrait.OPTIONAL_PARTITION_BY, TABLE_AS_SET);
+
+ private final StaticArgumentTrait staticTrait;
+ private final Set One or more annotations can be declared on top of a {@link UserDefinedFunction} class or
* individually for each {@code eval()/accumulate()} method for overloading function signatures. All
@@ -42,18 +42,24 @@
* part and let the default extraction do the rest:
*
* Note: Specifying the input arguments manually disables the entire reflection-based
- * extraction around arguments. This means that also {@link #isVarArgs()} and {@link
- * #argumentNames()} need to be specified manually if required.
+ * extraction around arguments. This means that also {@link #isVarArgs()} needs to be specified
+ * manually if required.
+ *
+ * Use {@link #arguments()} for more control about argument names and argument kinds.
*/
DataTypeHint[] input() default @DataTypeHint();
@@ -157,25 +165,14 @@
boolean isVarArgs() default false;
/**
- * Explicitly lists the argument names that a function takes as input.
+ * Explicitly lists the arguments that a function takes as input. Including their names, data
+ * types, kinds, and whether they are optional.
*
- * By default, if {@link #input()} is defined, explicit argument names are undefined and this
- * parameter can be used to provide argument names. If {@link #input()} is not defined, the
- * reflection-based extraction is used, thus, this parameter is ignored.
+ * It is recommended to use this parameter instead of {@link #input()}. Using both {@link
+ * #input()} and this parameter is not allowed. Specifying the list of arguments manually
+ * disables the entire reflection-based extraction around arguments.
*/
- String[] argumentNames() default {""};
-
- /**
- * Explicitly lists the argument that a function takes as input, including their names, types,
- * and whether they are optional.
- *
- * By default, it is recommended to use this parameter instead of {@link #input()}. If the
- * type of argumentHint is not defined, it will be considered an invalid argument and an
- * exception will be thrown. Additionally, both this parameter and {@link #input()} cannot be
- * defined at the same time. If neither argument nor {@link #input()} are defined,
- * reflection-based extraction will be used.
- */
- ArgumentHint[] argument() default {};
+ ArgumentHint[] arguments() default {};
/**
* Explicitly defines the intermediate result type that a function uses as accumulator.
@@ -192,4 +189,32 @@
* used.
*/
DataTypeHint output() default @DataTypeHint();
+
+ // --------------------------------------------------------------------------------------------
+ // Legacy
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Explicitly lists the argument names that a function takes as input.
+ *
+ * By default, if {@link #input()} is defined, explicit argument names are undefined and this
+ * parameter can be used to provide argument names. If {@link #input()} is not defined, the
+ * reflection-based extraction is used, thus, this parameter is ignored.
+ *
+ * @deprecated Use {@link #arguments()} instead.
+ */
+ @Deprecated
+ String[] argumentNames() default {""};
+
+ /**
+ * Explicitly lists the arguments that a function takes as input. Including their names, data
+ * types, kinds, and whether they are optional.
+ *
+ * It is recommended to use this parameter instead of {@link #input()}. Specifying the list
+ * of arguments manually disables the entire reflection-based extraction around arguments.
+ *
+ * @deprecated Use {@link #arguments()} instead.
+ */
+ @Deprecated
+ ArgumentHint[] argument() default {};
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ProcedureHint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ProcedureHint.java
index 73a57dffcaa5b..dabc1b8a37862 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ProcedureHint.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ProcedureHint.java
@@ -29,32 +29,38 @@
import java.lang.annotation.Target;
/**
- * A hint that influences the reflection-based extraction of input types and output types for
- * constructing the {@link TypeInference} logic of a {@link Procedure}.
+ * A hint that influences the reflection-based extraction of arguments and output for constructing
+ * the {@link TypeInference} logic of a {@link Procedure}.
*
* One or more annotations can be declared on top of a {@link Procedure} class or individually
* for each {@code call()} method for overloading function signatures. All hint parameters are
* optional. If a parameter is not defined, the default reflection-based extraction is used. Hint
* parameters defined on top of a {@link Procedure} class are inherited by all {@code call()}
- * methods. The {@link DataTypeHint} for the output data type of a {@link Procedure} should always
- * hint the component type of the array returned by {@link Procedure}.
+ * methods. The {@link DataTypeHint} for the output data type should always hint the component type
+ * of the array returned by {@link Procedure}.
*
* The following examples show how to explicitly specify procedure signatures as a whole or in
* part and let the default extraction do the rest:
*
* Note: Specifying the input arguments manually disables the entire reflection-based
- * extraction around arguments. This means that also {@link #isVarArgs()} and {@link
- * #argumentNames()} need to be specified manually if required.
+ * extraction around arguments. This means that also {@link #isVarArgs()} needs to be specified
+ * manually if required.
+ *
+ * Use {@link #arguments()} for more control about argument names and argument kinds.
*/
DataTypeHint[] input() default @DataTypeHint();
@@ -147,32 +156,49 @@
*/
boolean isVarArgs() default false;
+ /**
+ * Explicitly lists the arguments that a procedure takes as input. Including their names, data
+ * types, kinds, and whether they are optional.
+ *
+ * It is recommended to use this parameter instead of {@link #input()}. Using both {@link
+ * #input()} and this parameter is not allowed. Specifying the list of arguments manually
+ * disables the entire reflection-based extraction around arguments.
+ */
+ ArgumentHint[] arguments() default {};
+
+ /**
+ * Explicitly defines the result type that a procedure uses as output.
+ *
+ * By default, an explicit output type is undefined and the reflection-based extraction is
+ * used.
+ */
+ DataTypeHint output() default @DataTypeHint();
+
+ // --------------------------------------------------------------------------------------------
+ // Legacy
+ // --------------------------------------------------------------------------------------------
+
/**
* Explicitly lists the argument names that a procedure takes as input.
*
* By default, if {@link #input()} is defined, explicit argument names are undefined and this
* parameter can be used to provide argument names. If {@link #input()} is not defined, the
* reflection-based extraction is used, thus, this parameter is ignored.
+ *
+ * @deprecated Use {@link #arguments()} instead.
*/
+ @Deprecated
String[] argumentNames() default {""};
/**
- * Explicitly lists the argument that a procedure takes as input, including their names, types,
- * and whether they are optional.
+ * Explicitly lists the arguments that a procedure takes as input. Including their names, data
+ * types, kinds, and whether they are optional.
*
- * By default, it is recommended to use this parameter instead of {@link #input()}. If the
- * type of argumentHint is not defined, it will be considered an invalid argument and an
- * exception will be thrown. Additionally, both this parameter and {@link #input()} cannot be
- * defined at the same time. If neither argument nor {@link #input()} are defined,
- * reflection-based extraction will be used.
- */
- ArgumentHint[] argument() default {};
-
- /**
- * Explicitly defines the result type that a procedure uses as output.
+ * It is recommended to use this parameter instead of {@link #input()}. Specifying the list
+ * of arguments manually disables the entire reflection-based extraction around arguments.
*
- * By default, an explicit output type is undefined and the reflection-based extraction is
- * used.
+ * @deprecated Use {@link #arguments()} instead.
*/
- DataTypeHint output() default @DataTypeHint();
+ @Deprecated
+ ArgumentHint[] argument() default {};
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
index 6069a0f729c17..80d9546c203e0 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.types.extraction;
import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
@@ -461,7 +462,17 @@ static Boolean[] extractArgumentOptionals(Method method, int offset) {
return Arrays.stream(method.getParameters())
.skip(offset)
.map(parameter -> parameter.getAnnotation(ArgumentHint.class))
- .map(argumentHint -> argumentHint != null && argumentHint.isOptional())
+ .map(
+ h -> {
+ if (h == null) {
+ return false;
+ }
+ final ArgumentTrait[] traits = h.value();
+ if (traits.length != 1 || traits[0] != ArgumentTrait.SCALAR) {
+ throw extractionError("Only scalar arguments are supported yet.");
+ }
+ return h.isOptional();
+ })
.toArray(Boolean[]::new);
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index 5968b41fc84d7..0d01e04a08ffc 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -766,7 +766,7 @@ private AssigningConstructor(Constructor> constructor, List{@code
- * // accepts (INT, STRING) and returns BOOLEAN
+ * // accepts (INT, STRING) and returns BOOLEAN,
+ * // the arguments have names and are optional
* @FunctionHint(
- * argument = [(name = "f1", @DataTypeHint("INT"), isOptional = true),
- * (name = "f2", @DataTypeHint("STRING"), isOptional = true)],
+ * arguments = {
+ * @ArgumentHint(type = @DataTypeHint("INT"), name = "in1", isOptional = true),
+ * @ArgumentHint(type = @DataTypeHint("STRING"), name = "in2", isOptional = true)
+ * },
* output = @DataTypeHint("BOOLEAN")
* )
* class X extends ScalarFunction { ... }
*
- * // accepts (INT, STRING...) and returns BOOLEAN
+ * // accepts (INT, STRING...) and returns BOOLEAN,
+ * // the arguments have names
* @FunctionHint(
- * argument = [(name = "f1", @DataTypeHint("INT"), isOptional = false),
- * (name = "f2", @DataTypeHint("STRING"), isOptional = false)],
+ * arguments = {
+ * @ArgumentHint(type = @DataTypeHint("INT"), name = "in1"),
+ * @ArgumentHint(type = @DataTypeHint("STRING"), name = "in2")
+ * },
* isVarArgs = true,
* output = @DataTypeHint("BOOLEAN")
* )
@@ -122,6 +128,7 @@
* }
*
* @see DataTypeHint
+ * @see ArgumentHint
*/
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@@ -131,8 +138,7 @@
// Note to implementers:
// Because "null" is not supported as an annotation value. Every annotation parameter *must*
- // have
- // some representation for unknown values in order to merge multi-level annotations.
+ // have some representation for unknown values in order to merge multi-level annotations.
/**
* Explicitly lists the argument types that a function takes as input.
@@ -141,8 +147,10 @@
* used.
*
* {@code
- * // accepts (INT, STRING) and returns BOOLEAN
+ * // accepts (INT, STRING) and returns BOOLEAN,
+ * // the arguments have names and are optional
* @ProcedureHint(
- * argument = [(name = "f1", @DataTypeHint("INT"), isOptional = true),
- * (name = "f2", @DataTypeHint("STRING"), isOptional = true)],
+ * arguments = {
+ * @ArgumentHint(type = @DataTypeHint("INT"), name = "in1", isOptional = true),
+ * @ArgumentHint(type = @DataTypeHint("STRING"), name = "in2", isOptional = true)
+ * },
* output = @DataTypeHint("BOOLEAN")
* )
* class X implements Procedure { ... }
*
- * // accepts (INT, STRING...) and returns BOOLEAN
+ * // accepts (INT, STRING...) and returns BOOLEAN,
+ * // the arguments have names
* @ProcedureHint(
- * argument = [(name = "f1", @DataTypeHint("INT"), isOptional = false),
- * (name = "f2", @DataTypeHint("STRING"), isOptional = false)],
+ * arguments = {
+ * @ArgumentHint(type = @DataTypeHint("INT"), name = "in1"),
+ * @ArgumentHint(type = @DataTypeHint("STRING"), name = "in2")
+ * },
* isVarArgs = true,
* output = @DataTypeHint("BOOLEAN")
* )
@@ -114,16 +120,17 @@
* }
*
* @see DataTypeHint
+ * @see ArgumentHint
*/
@PublicEvolving
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Repeatable(ProcedureHints.class)
public @interface ProcedureHint {
+
// Note to implementers:
// Because "null" is not supported as an annotation value. Every annotation parameter *must*
- // have
- // some representation for unknown values in order to merge multi-level annotations.
+ // have some representation for unknown values in order to merge multi-level annotations.
/**
* Explicitly lists the argument types that a procedure takes as input.
@@ -132,8 +139,10 @@
* used.
*
*