Skip to content

Commit

Permalink
[FLINK-36707][table] Integrate PTF type inference into Calcite
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Jan 9, 2025
1 parent 42e2593 commit d5568aa
Show file tree
Hide file tree
Showing 33 changed files with 1,460 additions and 626 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.StateTypeStrategy;
import org.apache.flink.table.types.inference.StateTypeStrategyWrapper;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.TypeStrategy;

Expand Down Expand Up @@ -122,7 +121,7 @@ private void checkSingleStateEntry() {
}

private static StateTypeStrategy createStateTypeStrategy(DataType dataType) {
return StateTypeStrategyWrapper.of(TypeStrategies.explicit(dataType));
return StateTypeStrategy.of(TypeStrategies.explicit(dataType));
}

private static TypeStrategy createTypeStrategy(DataType dataType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.StateTypeStrategy;
import org.apache.flink.table.types.inference.StateTypeStrategyWrapper;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
Expand Down Expand Up @@ -321,7 +320,7 @@ private static LinkedHashMap<String, StateTypeStrategy> translateStateTypeStrate
e -> e.getKey().toInputTypeStrategy(),
e -> e.getValue().toAccumulatorTypeStrategy()));
final StateTypeStrategy accumulatorStrategy =
StateTypeStrategyWrapper.of(TypeStrategies.mapping(mappings));
StateTypeStrategy.of(TypeStrategies.mapping(mappings));
final Set<String> stateNames =
stateMapping.values().stream()
.map(FunctionStateTemplate::toAccumulatorStateName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;

Expand All @@ -44,7 +46,8 @@ public interface CallContext {

/**
* Returns {@code true} if the argument at the given position is a literal and {@code null},
* {@code false} otherwise.
* {@code false} otherwise. If the argument is declared as optional and has no value, true is
* returned.
*
* <p>Use {@link #isArgumentLiteral(int)} before to check if the argument is actually a literal.
*/
Expand All @@ -61,6 +64,17 @@ public interface CallContext {
*/
<T> Optional<T> getArgumentValue(int pos, Class<T> clazz);

/**
* Returns information about the table that has been passed to a table argument.
*
* <p>Semantics are only available for table arguments (i.e. arguments of a {@link
* ProcessTableFunction} that are annotated with {@code @ArgumentHint(TABLE_AS_SET)} or
* {@code @ArgumentHint(TABLE_AS_ROW)}).
*/
default Optional<TableSemantics> getTableSemantics(int pos) {
return Optional.empty();
}

/**
* Returns the function's name usually referencing the function in a catalog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,24 @@

package org.apache.flink.table.types.inference;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;

import java.util.Objects;
import java.util.Optional;

/** A helper class that wraps a {@link TypeStrategy} into a {@link StateTypeStrategy}. */
@PublicEvolving
public class StateTypeStrategyWrapper implements StateTypeStrategy {
@Internal
class DefaultStateTypeStrategy implements StateTypeStrategy {

private final TypeStrategy typeStrategy;

private StateTypeStrategyWrapper(TypeStrategy typeStrategy) {
DefaultStateTypeStrategy(TypeStrategy typeStrategy) {
this.typeStrategy =
Preconditions.checkNotNull(typeStrategy, "Type strategy must not be null.");
}

public static StateTypeStrategyWrapper of(TypeStrategy typeStrategy) {
return new StateTypeStrategyWrapper(typeStrategy);
}

@Override
public Optional<DataType> inferType(CallContext callContext) {
return typeStrategy.inferType(callContext);
Expand All @@ -50,8 +46,8 @@ public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof StateTypeStrategyWrapper) {
return Objects.equals(typeStrategy, ((StateTypeStrategyWrapper) o).typeStrategy);
if (o instanceof DefaultStateTypeStrategy) {
return Objects.equals(typeStrategy, ((DefaultStateTypeStrategy) o).typeStrategy);
}
if (o instanceof TypeStrategy) {
return Objects.equals(typeStrategy, o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@
/** Strategy for inferring a function call's intermediate result data type (i.e. state entry). */
@PublicEvolving
public interface StateTypeStrategy extends TypeStrategy {

static StateTypeStrategy of(TypeStrategy typeStrategy) {
return new DefaultStateTypeStrategy(typeStrategy);
}

// marker interface which will be filled with additional contracts in the future
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ public EnumSet<StaticArgumentTrait> getTraits() {
return traits;
}

public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}

@Override
public String toString() {
final StringBuilder s = new StringBuilder();
Expand All @@ -181,13 +185,18 @@ public String toString() {
// (myTypedTable ROW<i INT> {TABLE BY ROW})
// (myUntypedTable {TABLE BY ROW})
s.append(name);
s.append(" =>");
if (dataType != null) {
s.append(" ");
s.append(dataType);
}
if (!traits.equals(EnumSet.of(StaticArgumentTrait.SCALAR))) {
s.append(" ");
s.append(traits.stream().map(Enum::name).collect(Collectors.joining(", ", "{", "}")));
s.append(
traits.stream()
.map(Enum::name)
.map(n -> n.replace('_', ' '))
.collect(Collectors.joining(", ", "{", "}")));
}
return s.toString();
}
Expand Down Expand Up @@ -218,7 +227,7 @@ private void checkName() {
throw new ValidationException(
String.format(
"Invalid argument name '%s'. An argument must follow "
+ "the pattern [a-zA-Z_$][a-zA-Z_$0-9].",
+ "the pattern [a-zA-Z_$][a-zA-Z_$0-9]*.",
name));
}
}
Expand Down
Loading

0 comments on commit d5568aa

Please sign in to comment.