Skip to content

Commit c39aff1

Browse files
committed
[hotfix] Introduce ProjectedRowData.isNullAtNonProjected
1 parent 32849a2 commit c39aff1

File tree

2 files changed

+35
-1
lines changed

2 files changed

+35
-1
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,17 @@
4545
public class ProjectedRowData implements RowData {
4646

4747
private final int[] indexMapping;
48+
private final boolean isNullAtNonProjected;
4849

4950
private RowData row;
5051

5152
private ProjectedRowData(int[] indexMapping) {
53+
this(indexMapping, false);
54+
}
55+
56+
protected ProjectedRowData(int[] indexMapping, boolean isNullAtNonProjected) {
5257
this.indexMapping = indexMapping;
58+
this.isNullAtNonProjected = isNullAtNonProjected;
5359
}
5460

5561
/**
@@ -82,7 +88,8 @@ public void setRowKind(RowKind kind) {
8288

8389
@Override
8490
public boolean isNullAt(int pos) {
85-
return row.isNullAt(indexMapping[pos]);
91+
return (pos >= indexMapping.length && isNullAtNonProjected)
92+
|| row.isNullAt(indexMapping[pos]);
8693
}
8794

8895
@Override
@@ -226,6 +233,21 @@ public static ProjectedRowData from(int[] projection) {
226233
return new ProjectedRowData(projection);
227234
}
228235

236+
/**
237+
* Create an empty {@link ProjectedRowData} starting from a {@code projection} array with nulls
238+
* allowed for non-projected fields.
239+
*
240+
* <p>The array represents the mapping of the fields of the original {@link DataType}. For
241+
* example, {@code [0, 2, 1]} specifies to include in the following order the 1st field, the 3rd
242+
* field and the 2nd field of the row.
243+
*
244+
* @see Projection
245+
* @see ProjectedRowData
246+
*/
247+
public static ProjectedRowData from(int[] projection, boolean isNullAtNonProjected) {
248+
return new ProjectedRowData(projection, isNullAtNonProjected);
249+
}
250+
229251
/**
230252
* Create an empty {@link ProjectedRowData} starting from a {@link Projection}.
231253
*

flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/ProjectedRowDataTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import static org.apache.flink.table.test.TableAssertions.assertThat;
2929
import static org.assertj.core.api.Assertions.assertThatThrownBy;
30+
import static org.junit.jupiter.api.Assertions.assertFalse;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
3032

3133
/** Tests for {@link ProjectedRowData}. */
3234
class ProjectedRowDataTest {
@@ -67,4 +69,14 @@ void testProjectedRowsDoesntSupportNestedProjections() {
6769
}))
6870
.isInstanceOf(IllegalArgumentException.class);
6971
}
72+
73+
@Test
74+
void testIsNullAtNonProjected() {
75+
RowData initialRow = GenericRowData.of(-1L, -1L, 3L, -1L, 5L);
76+
RowData projected = ProjectedRowData.from(new int[] {2, 4}, true).replaceRow(initialRow);
77+
assertFalse(projected.isNullAt(0)); // 3L, projected
78+
assertFalse(projected.isNullAt(1)); // 5L, projected
79+
assertTrue(projected.isNullAt(2)); // not projected
80+
assertTrue(projected.isNullAt(3)); // not projected and doesn't exist in the original
81+
}
7082
}

0 commit comments

Comments
 (0)