Skip to content

Commit 208c58d

Browse files
committed
comments
1 parent abf8b08 commit 208c58d

File tree

18 files changed

+416
-158
lines changed

18 files changed

+416
-158
lines changed

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Model.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.api;
2020

2121
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.api.config.MLPredictRuntimeConfigOptions;
2223
import org.apache.flink.table.catalog.ResolvedSchema;
2324
import org.apache.flink.types.ColumnList;
2425

@@ -104,13 +105,7 @@ public interface Model {
104105
* runtime configuration options such as max-concurrent-operations, timeout, and execution mode
105106
* settings.
106107
*
107-
* <p>Common runtime options include:
108-
*
109-
* <ul>
110-
* <li>"max-concurrent-operations" - Number of records to process in each batch
111-
* <li>"timeout" - Maximum time to wait for model inference
112-
* <li>"async" - Whether to enable asynchronous execution
113-
* </ul>
108+
* <p>For Common runtime options, see {@link MLPredictRuntimeConfigOptions}.
114109
*
115110
* <p>Example:
116111
*

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ModelImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@ public Table predict(Table table, ColumnList inputColumns) {
7777

7878
@Override
7979
public Table predict(Table table, ColumnList inputColumns, Map<String, String> options) {
80-
// Use Expressions.map() instead of Expression.lit() to create a MAP literal since
80+
// Use Expressions.map() instead of Expressions.lit() to create a MAP literal since
8181
// lit() is not serializable to sql.
8282
if (options.isEmpty()) {
8383
return tableEnvironment.fromCall(
84-
"ML_PREDICT",
84+
BuiltInFunctionDefinitions.ML_PREDICT.getName(),
8585
table.asArgument("INPUT"),
8686
this.asArgument("MODEL"),
8787
Expressions.descriptor(inputColumns).asArgument("ARGS"));
@@ -93,7 +93,7 @@ public Table predict(Table table, ColumnList inputColumns, Map<String, String> o
9393
configKVs.add(v);
9494
});
9595
return tableEnvironment.fromCall(
96-
"ML_PREDICT",
96+
BuiltInFunctionDefinitions.ML_PREDICT.getName(),
9797
table.asArgument("INPUT"),
9898
this.asArgument("MODEL"),
9999
Expressions.descriptor(inputColumns).asArgument("ARGS"),

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public final R visit(Expression other) {
2929
return visit((UnresolvedReferenceExpression) other);
3030
} else if (other instanceof TableReferenceExpression) {
3131
return visit((TableReferenceExpression) other);
32+
} else if (other instanceof ModelReferenceExpression) {
33+
return visit((ModelReferenceExpression) other);
3234
} else if (other instanceof LocalReferenceExpression) {
3335
return visit((LocalReferenceExpression) other);
3436
} else if (other instanceof LookupCallExpression) {

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ModelReferenceExpression.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.table.api.Model;
2323
import org.apache.flink.table.api.TableEnvironment;
24+
import org.apache.flink.table.api.ValidationException;
2425
import org.apache.flink.table.catalog.ContextResolvedModel;
2526
import org.apache.flink.table.types.DataType;
2627
import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -103,6 +104,10 @@ public List<ResolvedExpression> getResolvedChildren() {
103104

104105
@Override
105106
public String asSerializableString(SqlFactory sqlFactory) {
107+
if (model.isAnonymous()) {
108+
throw new ValidationException("Anonymous models cannot be serialized.");
109+
}
110+
106111
return "MODEL " + model.getIdentifier().asSerializableString();
107112
}
108113

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ResolvedExpressionVisitor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public abstract class ResolvedExpressionVisitor<R> implements ExpressionVisitor<
3232
public final R visit(Expression other) {
3333
if (other instanceof TableReferenceExpression) {
3434
return visit((TableReferenceExpression) other);
35+
} else if (other instanceof ModelReferenceExpression) {
36+
return visit((ModelReferenceExpression) other);
3537
} else if (other instanceof LocalReferenceExpression) {
3638
return visit((LocalReferenceExpression) other);
3739
} else if (other instanceof ResolvedExpression) {
@@ -42,6 +44,8 @@ public final R visit(Expression other) {
4244

4345
public abstract R visit(TableReferenceExpression tableReference);
4446

47+
public abstract R visit(ModelReferenceExpression modelReferenceExpression);
48+
4549
public abstract R visit(LocalReferenceExpression localReference);
4650

4751
/** For resolved expressions created by the planner. */

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ResolvedExpressionDefaultVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.table.expressions.CallExpression;
2323
import org.apache.flink.table.expressions.FieldReferenceExpression;
2424
import org.apache.flink.table.expressions.LocalReferenceExpression;
25+
import org.apache.flink.table.expressions.ModelReferenceExpression;
2526
import org.apache.flink.table.expressions.NestedFieldReferenceExpression;
2627
import org.apache.flink.table.expressions.ResolvedExpression;
2728
import org.apache.flink.table.expressions.ResolvedExpressionVisitor;
@@ -41,6 +42,10 @@ public T visit(TableReferenceExpression tableReference) {
4142
return defaultMethod(tableReference);
4243
}
4344

45+
public T visit(ModelReferenceExpression modelReferenceExpression) {
46+
return defaultMethod(modelReferenceExpression);
47+
}
48+
4449
@Override
4550
public T visit(LocalReferenceExpression localReference) {
4651
return defaultMethod(localReference);
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.test.program;
20+
21+
import org.apache.flink.table.api.Table;
22+
import org.apache.flink.table.api.TableEnvironment;
23+
import org.apache.flink.table.api.TableRuntimeException;
24+
import org.apache.flink.table.api.ValidationException;
25+
import org.apache.flink.table.test.program.TableApiTestStep.TableEnvAccessor;
26+
import org.apache.flink.util.Preconditions;
27+
28+
import java.util.function.Function;
29+
30+
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
31+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
32+
33+
/**
34+
* Test step for executing Table API query that will fail eventually with either {@link
35+
* ValidationException} (during planning time) or {@link TableRuntimeException} (during execution
36+
* time).
37+
*
38+
* <p>Similar to {@link FailingSqlTestStep} but uses Table API instead of SQL.
39+
*/
40+
public final class FailingTableApiTestStep implements TestStep {
41+
42+
private final Function<TableEnvAccessor, Table> tableQuery;
43+
private final String sinkName;
44+
public final Class<? extends Exception> expectedException;
45+
public final String expectedErrorMessage;
46+
47+
FailingTableApiTestStep(
48+
Function<TableEnvAccessor, Table> tableQuery,
49+
String sinkName,
50+
Class<? extends Exception> expectedException,
51+
String expectedErrorMessage) {
52+
Preconditions.checkArgument(
53+
expectedException == ValidationException.class
54+
|| expectedException == TableRuntimeException.class,
55+
"Usually a Table API query should fail with either validation or runtime exception. "
56+
+ "Otherwise this might require an update to the exception design.");
57+
this.tableQuery = tableQuery;
58+
this.sinkName = sinkName;
59+
this.expectedException = expectedException;
60+
this.expectedErrorMessage = expectedErrorMessage;
61+
}
62+
63+
@Override
64+
public TestKind getKind() {
65+
return TestKind.FAILING_TABLE_API;
66+
}
67+
68+
public Table toTable(TableEnvironment env) {
69+
return tableQuery.apply(
70+
new TableEnvAccessor() {
71+
@Override
72+
public Table from(String path) {
73+
return env.from(path);
74+
}
75+
76+
@Override
77+
public Table fromCall(String path, Object... arguments) {
78+
return env.fromCall(path, arguments);
79+
}
80+
81+
@Override
82+
public Table fromCall(
83+
Class<? extends org.apache.flink.table.functions.UserDefinedFunction>
84+
function,
85+
Object... arguments) {
86+
return env.fromCall(function, arguments);
87+
}
88+
89+
@Override
90+
public Table fromValues(Object... values) {
91+
return env.fromValues(values);
92+
}
93+
94+
@Override
95+
public Table fromValues(
96+
org.apache.flink.table.types.AbstractDataType<?> dataType,
97+
Object... values) {
98+
return env.fromValues(dataType, values);
99+
}
100+
101+
@Override
102+
public Table sqlQuery(String query) {
103+
return env.sqlQuery(query);
104+
}
105+
106+
@Override
107+
public org.apache.flink.table.api.Model fromModel(String modelPath) {
108+
return env.fromModelPath(modelPath);
109+
}
110+
111+
@Override
112+
public org.apache.flink.table.api.Model from(
113+
org.apache.flink.table.api.ModelDescriptor modelDescriptor) {
114+
return env.from(modelDescriptor);
115+
}
116+
});
117+
}
118+
119+
public void apply(TableEnvironment env) {
120+
assertThatThrownBy(
121+
() -> {
122+
final Table table = toTable(env);
123+
table.executeInsert(sinkName).await();
124+
})
125+
.satisfies(anyCauseMatches(expectedException, expectedErrorMessage));
126+
}
127+
128+
public void applyAsSql(TableEnvironment env) {
129+
assertThatThrownBy(
130+
() -> {
131+
final Table table = toTable(env);
132+
final String query =
133+
table.getQueryOperation()
134+
.asSerializableString(
135+
org.apache.flink.table.expressions
136+
.DefaultSqlFactory.INSTANCE);
137+
env.executeSql(String.format("INSERT INTO %s %s", sinkName, query))
138+
.await();
139+
})
140+
.satisfies(anyCauseMatches(expectedException, expectedErrorMessage));
141+
}
142+
}

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.test.program;
2020

2121
import org.apache.flink.table.api.Model;
22+
import org.apache.flink.table.api.ModelDescriptor;
2223
import org.apache.flink.table.api.Table;
2324
import org.apache.flink.table.api.TableEnvironment;
2425
import org.apache.flink.table.api.TableResult;
@@ -81,6 +82,11 @@ public Table sqlQuery(String query) {
8182
public Model fromModel(String modelPath) {
8283
return env.fromModelPath(modelPath);
8384
}
85+
86+
@Override
87+
public Model from(ModelDescriptor modelDescriptor) {
88+
return env.from(modelDescriptor);
89+
}
8490
});
8591
}
8692

@@ -120,5 +126,8 @@ public interface TableEnvAccessor {
120126

121127
/** See {@link TableEnvironment#fromModelPath(String)}. */
122128
Model fromModel(String modelPath);
129+
130+
/** See {@link TableEnvironment#from(ModelDescriptor)}. */
131+
Model from(ModelDescriptor modelDescriptor);
123132
}
124133
}

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.configuration.ConfigOption;
2222
import org.apache.flink.table.api.Table;
2323
import org.apache.flink.table.api.TableRuntimeException;
24+
import org.apache.flink.table.api.ValidationException;
2425
import org.apache.flink.table.expressions.Expression;
2526
import org.apache.flink.table.functions.UserDefinedFunction;
2627
import org.apache.flink.table.test.program.FunctionTestStep.FunctionBehavior;
@@ -355,6 +356,22 @@ public Builder runFailingSql(
355356
return this;
356357
}
357358

359+
/**
360+
* Run step for executing a Table API query that will fail eventually with either {@link
361+
* ValidationException} (during planning time) or {@link TableRuntimeException} (during
362+
* execution time).
363+
*/
364+
public Builder runFailingTableApi(
365+
Function<TableEnvAccessor, Table> toTable,
366+
String sinkName,
367+
Class<? extends Exception> expectedException,
368+
String expectedErrorMessage) {
369+
this.runSteps.add(
370+
new FailingTableApiTestStep(
371+
toTable, sinkName, expectedException, expectedErrorMessage));
372+
return this;
373+
}
374+
358375
public Builder runTableApi(Function<TableEnvAccessor, Table> toTable, String sinkName) {
359376
this.runSteps.add(new TableApiTestStep(toTable, sinkName));
360377
return this;

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TestStep.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ enum TestKind {
5151
SINK_WITHOUT_DATA,
5252
SINK_WITH_DATA,
5353
SINK_WITH_RESTORE_DATA,
54-
FAILING_SQL
54+
FAILING_SQL,
55+
FAILING_TABLE_API
5556
}
5657

5758
TestKind getKind();

0 commit comments

Comments
 (0)