diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java index daa3d508857af3..94d1cb30a8e786 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java @@ -59,8 +59,6 @@ public class RawFormatDeserializationSchema implements DeserializationSchema producedTypeInfo, @@ -76,7 +74,6 @@ public RawFormatDeserializationSchema( @Override public void open(InitializationContext context) throws Exception { - reuse = new GenericRowData(1); converter.open(); } @@ -89,8 +86,10 @@ public RowData deserialize(byte[] message) throws IOException { validator.validate(message); field = converter.convert(message); } - reuse.setField(0, field); - return reuse; + + GenericRowData rowData = new GenericRowData(1); + rowData.setField(0, field); + return rowData; } @Override diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java index 10c97497cd39c3..fd78c8c6638a9f 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -67,76 +68,79 @@ public class RawFormatSerDeSchemaTest { @Parameterized.Parameters(name = "{index}: {0}") public static List testData() { return Arrays.asList( - TestSpec.type(TINYINT()).value(Byte.MAX_VALUE).binary(new byte[] {Byte.MAX_VALUE}), - TestSpec.type(SMALLINT()).value(Short.MAX_VALUE).binary(hexStringToByte("7fff")), + TestSpec.type(TINYINT()).values(Byte.MAX_VALUE).binary(new byte[] {Byte.MAX_VALUE}), + TestSpec.type(SMALLINT()).values(Short.MAX_VALUE).binary(hexStringToByte("7fff")), TestSpec.type(SMALLINT()) - .value(Short.MAX_VALUE) + .values(Short.MAX_VALUE) .withLittleEndian() .binary(hexStringToByte("ff7f")), - TestSpec.type(INT()).value(Integer.MAX_VALUE).binary(hexStringToByte("7fffffff")), + TestSpec.type(INT()).values(Integer.MAX_VALUE).binary(hexStringToByte("7fffffff")), TestSpec.type(INT()) - .value(Integer.MAX_VALUE) + .values(Integer.MAX_VALUE) .withLittleEndian() .binary(hexStringToByte("ffffff7f")), TestSpec.type(BIGINT()) - .value(Long.MAX_VALUE) + .values(Long.MAX_VALUE) .binary(hexStringToByte("7fffffffffffffff")), TestSpec.type(BIGINT()) - .value(Long.MAX_VALUE) + .values(Long.MAX_VALUE) .withLittleEndian() .binary(hexStringToByte("ffffffffffffff7f")), - TestSpec.type(FLOAT()).value(Float.MAX_VALUE).binary(hexStringToByte("7f7fffff")), + TestSpec.type(FLOAT()).values(Float.MAX_VALUE).binary(hexStringToByte("7f7fffff")), TestSpec.type(FLOAT()) - .value(Float.MAX_VALUE) + .values(Float.MAX_VALUE) .withLittleEndian() .binary(hexStringToByte("ffff7f7f")), TestSpec.type(DOUBLE()) - .value(Double.MAX_VALUE) + .values(Double.MAX_VALUE) .binary(hexStringToByte("7fefffffffffffff")), TestSpec.type(DOUBLE()) - .value(Double.MAX_VALUE) + .values(Double.MAX_VALUE) .withLittleEndian() .binary(hexStringToByte("ffffffffffffef7f")), - TestSpec.type(BOOLEAN()).value(true).binary(new byte[] {1}), - TestSpec.type(BOOLEAN()).value(false).binary(new byte[] {0}), - TestSpec.type(STRING()).value("Hello World").binary("Hello World".getBytes()), + TestSpec.type(BOOLEAN()).values(true).binary(new byte[] {1}), + TestSpec.type(BOOLEAN()).values(false).binary(new byte[] {0}), + TestSpec.type(STRING()).values("Hello World").binary("Hello World".getBytes()), TestSpec.type(STRING()) - .value("你好世界,Hello World") + .values("你好世界,Hello World") .binary("你好世界,Hello World".getBytes()), TestSpec.type(STRING()) - .value("Flink Awesome!") + .values("Flink Awesome!") .withCharset("UTF-16") .binary("Flink Awesome!".getBytes(StandardCharsets.UTF_16)), TestSpec.type(STRING()) - .value("Flink 帅哭!") + .values("Flink 帅哭!") .withCharset("UTF-16") .binary("Flink 帅哭!".getBytes(StandardCharsets.UTF_16)), - TestSpec.type(STRING()).value("").binary("".getBytes()), - TestSpec.type(VARCHAR(5)).value("HELLO").binary("HELLO".getBytes()), + TestSpec.type(STRING()).values("").binary("".getBytes()), + TestSpec.type(VARCHAR(5)).values("HELLO").binary("HELLO".getBytes()), + TestSpec.type(STRING()) + .values("line 1", "line 2", "line 3") + .binary("line 1".getBytes(), "line 2".getBytes(), "line 3".getBytes()), TestSpec.type(BYTES()) - .value(new byte[] {1, 3, 5, 7, 9}) + .values(new byte[] {1, 3, 5, 7, 9}) .binary(new byte[] {1, 3, 5, 7, 9}), - TestSpec.type(BYTES()).value(new byte[] {}).binary(new byte[] {}), - TestSpec.type(BINARY(3)).value(new byte[] {1, 3, 5}).binary(new byte[] {1, 3, 5}), + TestSpec.type(BYTES()).values(new byte[] {}).binary(new byte[] {}), + TestSpec.type(BINARY(3)).values(new byte[] {1, 3, 5}).binary(new byte[] {1, 3, 5}), TestSpec.type(RAW(LocalDateTime.class, new LocalDateTimeSerializer())) - .value(LocalDateTime.parse("2020-11-11T18:08:01.123")) + .values(LocalDateTime.parse("2020-11-11T18:08:01.123")) .binary( serializeLocalDateTime( LocalDateTime.parse("2020-11-11T18:08:01.123"))), // test nulls - TestSpec.type(TINYINT()).value(null).binary(null), - TestSpec.type(SMALLINT()).value(null).binary(null), - TestSpec.type(INT()).value(null).binary(null), - TestSpec.type(BIGINT()).value(null).binary(null), - TestSpec.type(FLOAT()).value(null).binary(null), - TestSpec.type(DOUBLE()).value(null).binary(null), - TestSpec.type(BOOLEAN()).value(null).binary(null), - TestSpec.type(STRING()).value(null).binary(null), - TestSpec.type(BYTES()).value(null).binary(null), + TestSpec.type(TINYINT()).values((Object) null).binary((byte[]) null), + TestSpec.type(SMALLINT()).values((Object) null).binary((byte[]) null), + TestSpec.type(INT()).values((Object) null).binary((byte[]) null), + TestSpec.type(BIGINT()).values((Object) null).binary((byte[]) null), + TestSpec.type(FLOAT()).values((Object) null).binary((byte[]) null), + TestSpec.type(DOUBLE()).values((Object) null).binary((byte[]) null), + TestSpec.type(BOOLEAN()).values((Object) null).binary((byte[]) null), + TestSpec.type(STRING()).values((Object) null).binary((byte[]) null), + TestSpec.type(BYTES()).values((Object) null).binary((byte[]) null), TestSpec.type(RAW(LocalDateTime.class, new LocalDateTimeSerializer())) - .value(null) - .binary(null)); + .values((Object) null) + .binary((byte[]) null)); } @Parameterized.Parameter public TestSpec testSpec; @@ -155,17 +159,35 @@ public void testSerializationAndDeserialization() throws Exception { deserializationSchema.open(mock(DeserializationSchema.InitializationContext.class)); serializationSchema.open(mock(SerializationSchema.InitializationContext.class)); - Row row = Row.of(testSpec.value); DataStructureConverter converter = DataStructureConverters.getConverter(ROW(FIELD("single", testSpec.type))); - RowData originalRowData = (RowData) converter.toInternal(row); - byte[] serializedBytes = serializationSchema.serialize(originalRowData); - assertThat(serializedBytes).isEqualTo(testSpec.binary); + byte[][] serializedBytesArr = new byte[testSpec.values.length][]; + RowData[] deserializedRowDataArr = new RowData[testSpec.values.length]; + + // The following loops are partitioned to ensure the serialized/deserialized + // values are not copied by reference. (see FLINK-35097) + + // Process serialization + for (int i = 0; i < testSpec.values.length; i++) { + Row row = Row.of(testSpec.values[i]); + RowData originalRowData = (RowData) converter.toInternal(row); + serializedBytesArr[i] = serializationSchema.serialize(originalRowData); + } + + // Test serialization and process deserialization + for (int i = 0; i < testSpec.values.length; i++) { + assertThat(serializedBytesArr[i]).isEqualTo(testSpec.binary[i]); - RowData deserializeRowData = deserializationSchema.deserialize(serializedBytes); - Row actual = (Row) converter.toExternal(deserializeRowData); - assertThat(actual).isEqualTo(row); + deserializedRowDataArr[i] = deserializationSchema.deserialize(serializedBytesArr[i]); + } + + // Test deserialization + for (int i = 0; i < testSpec.values.length; i++) { + Row row = Row.of(testSpec.values[i]); + Row actual = (Row) converter.toExternal(deserializedRowDataArr[i]); + assertThat(actual).isEqualTo(row); + } } private static byte[] serializeLocalDateTime(LocalDateTime localDateTime) { @@ -183,9 +205,9 @@ private static byte[] serializeLocalDateTime(LocalDateTime localDateTime) { private static class TestSpec { - private Object value; - private byte[] binary; - private DataType type; + private Object[] values; + private byte[][] binary; + private final DataType type; private String charsetName = "UTF-8"; private boolean isBigEndian = true; @@ -197,12 +219,12 @@ public static TestSpec type(DataType fieldType) { return new TestSpec(fieldType); } - public TestSpec value(Object value) { - this.value = value; + public TestSpec values(Object... values) { + this.values = values; return this; } - public TestSpec binary(byte[] bytes) { + public TestSpec binary(byte[]... bytes) { this.binary = bytes; return this; } @@ -219,12 +241,16 @@ public TestSpec withLittleEndian() { @Override public String toString() { - String hex = binary == null ? "null" : "0x" + StringUtils.byteToHexString(binary); + ArrayList hexes = new ArrayList<>(); + for (byte[] b : binary) { + hexes.add(b == null ? "" : "0x" + StringUtils.byteToHexString(b)); + } + return "TestSpec{" - + "value=" - + value + + "values=" + + Arrays.toString(values) + ", binary=" - + hex + + hexes + ", type=" + type + ", charsetName='"