Skip to content

Commit bffc43a

Browse files
[FLINK-20539][table] Type mismatch when useing computed ROW column
Co-authored-by: xuyang <[email protected]>
1 parent 5156147 commit bffc43a

File tree

10 files changed

+309
-29
lines changed

10 files changed

+309
-29
lines changed

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.calcite.rel.type.RelDataType;
2222
import org.apache.calcite.rel.type.RelDataTypeFactory;
23+
import org.apache.calcite.rel.type.StructKind;
2324
import org.apache.calcite.sql.SqlCharStringLiteral;
2425
import org.apache.calcite.sql.SqlDataTypeSpec;
2526
import org.apache.calcite.sql.SqlIdentifier;
@@ -156,6 +157,7 @@ public boolean equalsDeep(SqlTypeNameSpec spec, Litmus litmus) {
156157
public RelDataType deriveType(SqlValidator sqlValidator) {
157158
final RelDataTypeFactory typeFactory = sqlValidator.getTypeFactory();
158159
return typeFactory.createStructType(
160+
StructKind.PEEK_FIELDS_NO_EXPAND,
159161
fieldTypes.stream()
160162
.map(dt -> dt.deriveType(sqlValidator))
161163
.collect(Collectors.toList()),

flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.sql.parser;
2020

2121
import org.apache.calcite.rel.type.RelDataType;
22+
import org.apache.calcite.rel.type.StructKind;
2223
import org.apache.calcite.sql.type.SqlTypeName;
2324

2425
import java.util.List;
@@ -113,6 +114,11 @@ public RelDataType createStructType(List<RelDataType> keyTypes, List<String> nam
113114
return typeFactory.createStructType(keyTypes, names);
114115
}
115116

117+
public RelDataType createStructType(
118+
StructKind structKind, List<RelDataType> keyTypes, List<String> names) {
119+
return typeFactory.createStructType(structKind, keyTypes, names);
120+
}
121+
116122
public RelDataType createStructuredType(
117123
String className, List<RelDataType> typeList, List<String> fieldNameList) {
118124
return typeFactory.createStructuredType(className, typeList, fieldNameList);

flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.calcite.rel.type.DelegatingTypeSystem;
2828
import org.apache.calcite.rel.type.RelDataType;
2929
import org.apache.calcite.rel.type.RelDataTypeSystem;
30+
import org.apache.calcite.rel.type.StructKind;
3031
import org.apache.calcite.sql.SqlDataTypeSpec;
3132
import org.apache.calcite.sql.SqlDialect;
3233
import org.apache.calcite.sql.SqlNode;
@@ -211,6 +212,7 @@ static Stream<Arguments> testData() {
211212
"ROW<f0 INT NOT NULL, f1 BOOLEAN>",
212213
nullable(
213214
FIXTURE.createStructType(
215+
StructKind.PEEK_FIELDS_NO_EXPAND,
214216
Arrays.asList(
215217
FIXTURE.intType, nullable(FIXTURE.booleanType)),
216218
Arrays.asList("f0", "f1"))),
@@ -219,6 +221,7 @@ FIXTURE.intType, nullable(FIXTURE.booleanType)),
219221
"ROW(f0 INT NOT NULL, f1 BOOLEAN)",
220222
nullable(
221223
FIXTURE.createStructType(
224+
StructKind.PEEK_FIELDS_NO_EXPAND,
222225
Arrays.asList(
223226
FIXTURE.intType, nullable(FIXTURE.booleanType)),
224227
Arrays.asList("f0", "f1"))),
@@ -227,33 +230,40 @@ FIXTURE.intType, nullable(FIXTURE.booleanType)),
227230
"ROW<`f0` INT>",
228231
nullable(
229232
FIXTURE.createStructType(
233+
StructKind.PEEK_FIELDS_NO_EXPAND,
230234
Collections.singletonList(nullable(FIXTURE.intType)),
231235
Collections.singletonList("f0"))),
232236
"ROW< `f0` INTEGER >"),
233237
createArgumentsTestItem(
234238
"ROW(`f0` INT)",
235239
nullable(
236240
FIXTURE.createStructType(
241+
StructKind.PEEK_FIELDS_NO_EXPAND,
237242
Collections.singletonList(nullable(FIXTURE.intType)),
238243
Collections.singletonList("f0"))),
239244
"ROW(`f0` INTEGER)"),
240245
createArgumentsTestItem(
241246
"ROW<>",
242247
nullable(
243248
FIXTURE.createStructType(
244-
Collections.emptyList(), Collections.emptyList())),
249+
StructKind.PEEK_FIELDS_NO_EXPAND,
250+
Collections.emptyList(),
251+
Collections.emptyList())),
245252
"ROW<>"),
246253
createArgumentsTestItem(
247254
"ROW()",
248255
nullable(
249256
FIXTURE.createStructType(
250-
Collections.emptyList(), Collections.emptyList())),
257+
StructKind.PEEK_FIELDS_NO_EXPAND,
258+
Collections.emptyList(),
259+
Collections.emptyList())),
251260
"ROW()"),
252261
createArgumentsTestItem(
253262
"ROW<f0 INT NOT NULL 'This is a comment.', "
254263
+ "f1 BOOLEAN 'This as well.'>",
255264
nullable(
256265
FIXTURE.createStructType(
266+
StructKind.PEEK_FIELDS_NO_EXPAND,
257267
Arrays.asList(
258268
FIXTURE.intType, nullable(FIXTURE.booleanType)),
259269
Arrays.asList("f0", "f1"))),

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@
2121
import org.apache.calcite.rel.type.RelDataType;
2222
import org.apache.calcite.rel.type.RelDataTypeFactory;
2323
import org.apache.calcite.rex.RexBuilder;
24+
import org.apache.calcite.rex.RexCall;
25+
import org.apache.calcite.rex.RexFieldAccess;
2426
import org.apache.calcite.rex.RexLiteral;
2527
import org.apache.calcite.rex.RexNode;
28+
import org.apache.calcite.rex.RexShuttle;
29+
import org.apache.calcite.rex.RexUtil;
30+
import org.apache.calcite.sql.SqlKind;
2631
import org.apache.calcite.util.TimestampString;
2732

2833
/** A slim extension over a {@link RexBuilder}. See the overridden methods for more explanation. */
@@ -40,16 +45,8 @@ public FlinkRexBuilder(RelDataTypeFactory typeFactory) {
4045
*/
4146
@Override
4247
public RexNode makeFieldAccess(RexNode expr, String fieldName, boolean caseSensitive) {
43-
RexNode field = super.makeFieldAccess(expr, fieldName, caseSensitive);
44-
if (expr.getType().isNullable() && !field.getType().isNullable()) {
45-
return makeCast(
46-
typeFactory.createTypeWithNullability(field.getType(), true),
47-
field,
48-
true,
49-
false);
50-
}
51-
52-
return field;
48+
final RexNode field = super.makeFieldAccess(expr, fieldName, caseSensitive);
49+
return makeFieldAccess(expr, field);
5350
}
5451

5552
/**
@@ -61,16 +58,8 @@ public RexNode makeFieldAccess(RexNode expr, String fieldName, boolean caseSensi
6158
*/
6259
@Override
6360
public RexNode makeFieldAccess(RexNode expr, int i) {
64-
RexNode field = super.makeFieldAccess(expr, i);
65-
if (expr.getType().isNullable() && !field.getType().isNullable()) {
66-
return makeCast(
67-
typeFactory.createTypeWithNullability(field.getType(), true),
68-
field,
69-
true,
70-
false);
71-
}
72-
73-
return field;
61+
final RexNode field = super.makeFieldAccess(expr, i);
62+
return makeFieldAccess(expr, field);
7463
}
7564

7665
/**
@@ -102,4 +91,57 @@ public RexLiteral makeZeroLiteral(RelDataType type) {
10291
return super.makeZeroLiteral(type);
10392
}
10493
}
94+
95+
private RexNode makeFieldAccess(RexNode expr, RexNode field) {
96+
final RexNode fieldWithRemovedCast = removeCastNullableFromFieldAccess(field);
97+
if (field.getType().isNullable() != fieldWithRemovedCast.getType().isNullable()
98+
|| expr.getType().isNullable() && !field.getType().isNullable()) {
99+
return makeCast(
100+
typeFactory.createTypeWithNullability(field.getType(), true),
101+
fieldWithRemovedCast,
102+
true,
103+
false);
104+
}
105+
106+
return expr.getType().isNullable() && fieldWithRemovedCast.getType().isNullable()
107+
? fieldWithRemovedCast
108+
: field;
109+
}
110+
111+
/**
112+
* {@link FlinkRexBuilder#makeFieldAccess} will adjust nullability based on nullability of the
113+
* enclosing type. However, it might be a deeply nested column and for every step {@link
114+
* FlinkRexBuilder#makeFieldAccess} will try to insert a cast. This method will remove previous
115+
* cast in order to keep only one.
116+
*/
117+
private RexNode removeCastNullableFromFieldAccess(RexNode rexFieldAccess) {
118+
if (!(rexFieldAccess instanceof RexFieldAccess)) {
119+
return rexFieldAccess;
120+
}
121+
RexNode rexNode = rexFieldAccess;
122+
while (rexNode instanceof RexFieldAccess) {
123+
rexNode = ((RexFieldAccess) rexNode).getReferenceExpr();
124+
}
125+
if (rexNode.getKind() != SqlKind.CAST) {
126+
return rexFieldAccess;
127+
}
128+
RexShuttle visitor =
129+
new RexShuttle() {
130+
@Override
131+
public RexNode visitCall(final RexCall call) {
132+
if (call.getKind() == SqlKind.CAST
133+
&& !call.operands.get(0).getType().isNullable()
134+
&& call.getType().isNullable()
135+
&& call.getOperands()
136+
.get(0)
137+
.getType()
138+
.getFieldList()
139+
.equals(call.getType().getFieldList())) {
140+
return RexUtil.removeCast(call);
141+
}
142+
return call;
143+
}
144+
};
145+
return RexUtil.apply(visitor, new RexNode[] {rexFieldAccess})[0];
146+
}
105147
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ Stream<TestSetSpec> getTestSetSpecs() {
9494
FIELD(
9595
"r",
9696
ROW(
97-
FIELD("s", STRING()),
97+
FIELD(
98+
"s",
99+
STRING().notNull()),
98100
FIELD("b", BOOLEAN()),
99101
FIELD("i", INT()))),
100102
FIELD("s", STRING()))),
@@ -103,7 +105,8 @@ Stream<TestSetSpec> getTestSetSpecs() {
103105
// the inner NOT NULL is ignored in SQL because the outer ROW is
104106
// nullable and the cast does not allow setting the outer
105107
// nullability but derives it from the source operand
106-
DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")),
108+
DataTypes.of(
109+
"ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")),
107110
TestSetSpec.forFunction(
108111
BuiltInFunctionDefinitions.CAST,
109112
"explicit with nested rows and explicit nullability change")

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -880,13 +880,13 @@ void testCreateTableWithFullDataTypes() {
880880
createTestItem(
881881
"ROW<f0 INT NOT NULL, f1 BOOLEAN>",
882882
DataTypes.ROW(
883-
DataTypes.FIELD("f0", DataTypes.INT()),
883+
DataTypes.FIELD("f0", DataTypes.INT().notNull()),
884884
DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
885885
// Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
886886
createTestItem(
887887
"ROW(f0 INT NOT NULL, f1 BOOLEAN)",
888888
DataTypes.ROW(
889-
DataTypes.FIELD("f0", DataTypes.INT()),
889+
DataTypes.FIELD("f0", DataTypes.INT().notNull()),
890890
DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
891891
createTestItem(
892892
"ROW<`f0` INT>",
@@ -901,7 +901,7 @@ void testCreateTableWithFullDataTypes() {
901901
"ROW<f0 INT NOT NULL 'This is a comment.',"
902902
+ " f1 BOOLEAN 'This as well.'>",
903903
DataTypes.ROW(
904-
DataTypes.FIELD("f0", DataTypes.INT()),
904+
DataTypes.FIELD("f0", DataTypes.INT().notNull()),
905905
DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
906906
createTestItem(
907907
"ARRAY<ROW<f0 INT, f1 BOOLEAN>>",

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,13 @@ LogicalProject(EXPR$0=[ITEM($0, $2).value], EXPR$1=[ITEM($1, _UTF-16LE'item').va
192192
</Resource>
193193
<Resource name="ast">
194194
<![CDATA[
195-
LogicalProject(EXPR$0=[CAST(CAST(ITEM($5.result, 1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
195+
LogicalProject(EXPR$0=[CAST(ITEM(CAST($5.result):RecordType:peek_no_expand(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol) NOT NULL meta) NOT NULL ARRAY, 1).meta.symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
196196
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable]])
197197
]]>
198198
</Resource>
199199
<Resource name="optimized rel plan">
200200
<![CDATA[
201-
LogicalProject(EXPR$0=[CAST(CAST(ITEM($0.result, 1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
201+
LogicalProject(EXPR$0=[CAST(ITEM(CAST($0.result):RecordType:peek_no_expand(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol) NOT NULL meta) NOT NULL ARRAY, 1).meta.symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
202202
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[chart], metadata=[]]])
203203
]]>
204204
</Resource>

0 commit comments

Comments
 (0)