Skip to content

Commit

Permalink
[FLINK-35097][table] Fix 'raw' format deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
kumar-mallikarjuna authored and twalthr committed Apr 22, 2024
1 parent 87ed9cc commit 3cd6d89
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public class RawFormatDeserializationSchema implements DeserializationSchema<Row

private final DataLengthValidator validator;

private transient GenericRowData reuse;

public RawFormatDeserializationSchema(
LogicalType deserializedType,
TypeInformation<RowData> producedTypeInfo,
Expand All @@ -76,7 +74,6 @@ public RawFormatDeserializationSchema(

@Override
public void open(InitializationContext context) throws Exception {
reuse = new GenericRowData(1);
converter.open();
}

Expand All @@ -89,8 +86,10 @@ public RowData deserialize(byte[] message) throws IOException {
validator.validate(message);
field = converter.convert(message);
}
reuse.setField(0, field);
return reuse;

GenericRowData rowData = new GenericRowData(1);
rowData.setField(0, field);
return rowData;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -67,76 +68,79 @@ public class RawFormatSerDeSchemaTest {
@Parameterized.Parameters(name = "{index}: {0}")
public static List<TestSpec> testData() {
return Arrays.asList(
TestSpec.type(TINYINT()).value(Byte.MAX_VALUE).binary(new byte[] {Byte.MAX_VALUE}),
TestSpec.type(SMALLINT()).value(Short.MAX_VALUE).binary(hexStringToByte("7fff")),
TestSpec.type(TINYINT()).values(Byte.MAX_VALUE).binary(new byte[] {Byte.MAX_VALUE}),
TestSpec.type(SMALLINT()).values(Short.MAX_VALUE).binary(hexStringToByte("7fff")),
TestSpec.type(SMALLINT())
.value(Short.MAX_VALUE)
.values(Short.MAX_VALUE)
.withLittleEndian()
.binary(hexStringToByte("ff7f")),
TestSpec.type(INT()).value(Integer.MAX_VALUE).binary(hexStringToByte("7fffffff")),
TestSpec.type(INT()).values(Integer.MAX_VALUE).binary(hexStringToByte("7fffffff")),
TestSpec.type(INT())
.value(Integer.MAX_VALUE)
.values(Integer.MAX_VALUE)
.withLittleEndian()
.binary(hexStringToByte("ffffff7f")),
TestSpec.type(BIGINT())
.value(Long.MAX_VALUE)
.values(Long.MAX_VALUE)
.binary(hexStringToByte("7fffffffffffffff")),
TestSpec.type(BIGINT())
.value(Long.MAX_VALUE)
.values(Long.MAX_VALUE)
.withLittleEndian()
.binary(hexStringToByte("ffffffffffffff7f")),
TestSpec.type(FLOAT()).value(Float.MAX_VALUE).binary(hexStringToByte("7f7fffff")),
TestSpec.type(FLOAT()).values(Float.MAX_VALUE).binary(hexStringToByte("7f7fffff")),
TestSpec.type(FLOAT())
.value(Float.MAX_VALUE)
.values(Float.MAX_VALUE)
.withLittleEndian()
.binary(hexStringToByte("ffff7f7f")),
TestSpec.type(DOUBLE())
.value(Double.MAX_VALUE)
.values(Double.MAX_VALUE)
.binary(hexStringToByte("7fefffffffffffff")),
TestSpec.type(DOUBLE())
.value(Double.MAX_VALUE)
.values(Double.MAX_VALUE)
.withLittleEndian()
.binary(hexStringToByte("ffffffffffffef7f")),
TestSpec.type(BOOLEAN()).value(true).binary(new byte[] {1}),
TestSpec.type(BOOLEAN()).value(false).binary(new byte[] {0}),
TestSpec.type(STRING()).value("Hello World").binary("Hello World".getBytes()),
TestSpec.type(BOOLEAN()).values(true).binary(new byte[] {1}),
TestSpec.type(BOOLEAN()).values(false).binary(new byte[] {0}),
TestSpec.type(STRING()).values("Hello World").binary("Hello World".getBytes()),
TestSpec.type(STRING())
.value("你好世界,Hello World")
.values("你好世界,Hello World")
.binary("你好世界,Hello World".getBytes()),
TestSpec.type(STRING())
.value("Flink Awesome!")
.values("Flink Awesome!")
.withCharset("UTF-16")
.binary("Flink Awesome!".getBytes(StandardCharsets.UTF_16)),
TestSpec.type(STRING())
.value("Flink 帅哭!")
.values("Flink 帅哭!")
.withCharset("UTF-16")
.binary("Flink 帅哭!".getBytes(StandardCharsets.UTF_16)),
TestSpec.type(STRING()).value("").binary("".getBytes()),
TestSpec.type(VARCHAR(5)).value("HELLO").binary("HELLO".getBytes()),
TestSpec.type(STRING()).values("").binary("".getBytes()),
TestSpec.type(VARCHAR(5)).values("HELLO").binary("HELLO".getBytes()),
TestSpec.type(STRING())
.values("line 1", "line 2", "line 3")
.binary("line 1".getBytes(), "line 2".getBytes(), "line 3".getBytes()),
TestSpec.type(BYTES())
.value(new byte[] {1, 3, 5, 7, 9})
.values(new byte[] {1, 3, 5, 7, 9})
.binary(new byte[] {1, 3, 5, 7, 9}),
TestSpec.type(BYTES()).value(new byte[] {}).binary(new byte[] {}),
TestSpec.type(BINARY(3)).value(new byte[] {1, 3, 5}).binary(new byte[] {1, 3, 5}),
TestSpec.type(BYTES()).values(new byte[] {}).binary(new byte[] {}),
TestSpec.type(BINARY(3)).values(new byte[] {1, 3, 5}).binary(new byte[] {1, 3, 5}),
TestSpec.type(RAW(LocalDateTime.class, new LocalDateTimeSerializer()))
.value(LocalDateTime.parse("2020-11-11T18:08:01.123"))
.values(LocalDateTime.parse("2020-11-11T18:08:01.123"))
.binary(
serializeLocalDateTime(
LocalDateTime.parse("2020-11-11T18:08:01.123"))),

// test nulls
TestSpec.type(TINYINT()).value(null).binary(null),
TestSpec.type(SMALLINT()).value(null).binary(null),
TestSpec.type(INT()).value(null).binary(null),
TestSpec.type(BIGINT()).value(null).binary(null),
TestSpec.type(FLOAT()).value(null).binary(null),
TestSpec.type(DOUBLE()).value(null).binary(null),
TestSpec.type(BOOLEAN()).value(null).binary(null),
TestSpec.type(STRING()).value(null).binary(null),
TestSpec.type(BYTES()).value(null).binary(null),
TestSpec.type(TINYINT()).values((Object) null).binary((byte[]) null),
TestSpec.type(SMALLINT()).values((Object) null).binary((byte[]) null),
TestSpec.type(INT()).values((Object) null).binary((byte[]) null),
TestSpec.type(BIGINT()).values((Object) null).binary((byte[]) null),
TestSpec.type(FLOAT()).values((Object) null).binary((byte[]) null),
TestSpec.type(DOUBLE()).values((Object) null).binary((byte[]) null),
TestSpec.type(BOOLEAN()).values((Object) null).binary((byte[]) null),
TestSpec.type(STRING()).values((Object) null).binary((byte[]) null),
TestSpec.type(BYTES()).values((Object) null).binary((byte[]) null),
TestSpec.type(RAW(LocalDateTime.class, new LocalDateTimeSerializer()))
.value(null)
.binary(null));
.values((Object) null)
.binary((byte[]) null));
}

@Parameterized.Parameter public TestSpec testSpec;
Expand All @@ -155,17 +159,35 @@ public void testSerializationAndDeserialization() throws Exception {
deserializationSchema.open(mock(DeserializationSchema.InitializationContext.class));
serializationSchema.open(mock(SerializationSchema.InitializationContext.class));

Row row = Row.of(testSpec.value);
DataStructureConverter<Object, Object> converter =
DataStructureConverters.getConverter(ROW(FIELD("single", testSpec.type)));
RowData originalRowData = (RowData) converter.toInternal(row);

byte[] serializedBytes = serializationSchema.serialize(originalRowData);
assertThat(serializedBytes).isEqualTo(testSpec.binary);
byte[][] serializedBytesArr = new byte[testSpec.values.length][];
RowData[] deserializedRowDataArr = new RowData[testSpec.values.length];

// The following loops are partitioned to ensure the serialized/deserialized
// values are not copied by reference. (see FLINK-35097)

// Process serialization
for (int i = 0; i < testSpec.values.length; i++) {
Row row = Row.of(testSpec.values[i]);
RowData originalRowData = (RowData) converter.toInternal(row);
serializedBytesArr[i] = serializationSchema.serialize(originalRowData);
}

// Test serialization and process deserialization
for (int i = 0; i < testSpec.values.length; i++) {
assertThat(serializedBytesArr[i]).isEqualTo(testSpec.binary[i]);

RowData deserializeRowData = deserializationSchema.deserialize(serializedBytes);
Row actual = (Row) converter.toExternal(deserializeRowData);
assertThat(actual).isEqualTo(row);
deserializedRowDataArr[i] = deserializationSchema.deserialize(serializedBytesArr[i]);
}

// Test deserialization
for (int i = 0; i < testSpec.values.length; i++) {
Row row = Row.of(testSpec.values[i]);
Row actual = (Row) converter.toExternal(deserializedRowDataArr[i]);
assertThat(actual).isEqualTo(row);
}
}

private static byte[] serializeLocalDateTime(LocalDateTime localDateTime) {
Expand All @@ -183,9 +205,9 @@ private static byte[] serializeLocalDateTime(LocalDateTime localDateTime) {

private static class TestSpec {

private Object value;
private byte[] binary;
private DataType type;
private Object[] values;
private byte[][] binary;
private final DataType type;
private String charsetName = "UTF-8";
private boolean isBigEndian = true;

Expand All @@ -197,12 +219,12 @@ public static TestSpec type(DataType fieldType) {
return new TestSpec(fieldType);
}

public TestSpec value(Object value) {
this.value = value;
public TestSpec values(Object... values) {
this.values = values;
return this;
}

public TestSpec binary(byte[] bytes) {
public TestSpec binary(byte[]... bytes) {
this.binary = bytes;
return this;
}
Expand All @@ -219,12 +241,16 @@ public TestSpec withLittleEndian() {

@Override
public String toString() {
String hex = binary == null ? "null" : "0x" + StringUtils.byteToHexString(binary);
ArrayList<String> hexes = new ArrayList<>();
for (byte[] b : binary) {
hexes.add(b == null ? "" : "0x" + StringUtils.byteToHexString(b));
}

return "TestSpec{"
+ "value="
+ value
+ "values="
+ Arrays.toString(values)
+ ", binary="
+ hex
+ hexes
+ ", type="
+ type
+ ", charsetName='"
Expand Down

0 comments on commit 3cd6d89

Please sign in to comment.