Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,20 @@

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimeType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -35,6 +45,10 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Conversions between Java object and bytes.
Expand Down Expand Up @@ -112,6 +126,12 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) {
(Timestamp) value, ((LocalZonedTimestampType) type).getPrecision());
case TIME_WITHOUT_TIME_ZONE:
return timeToByteBuffer((Integer) value, ((TimeType) type).getPrecision());
case ARRAY:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This writes a Paimon-private encoding into Iceberg lower/upper bounds for nested types. Iceberg's binary single-value serialization only has implementations for primitive values (Iceberg Conversions throws for list/map/struct), so an Iceberg reader that sees these bounds will try to decode them as Iceberg metadata and fail or misinterpret them. If we cannot encode these values using Iceberg's standard format, we should omit lower/upper bounds for ARRAY/MAP/ROW instead of serializing a custom round-trip format here.

return arrayToByteBuffer((ArrayType) type, (InternalArray) value);
case MAP:
return mapToByteBuffer((MapType) type, (InternalMap) value);
case ROW:
return rowToByteBuffer((RowType) type, (InternalRow) value);
default:
throw new UnsupportedOperationException("Cannot serialize type: " + type);
}
Expand Down Expand Up @@ -143,6 +163,158 @@ private static ByteBuffer timeToByteBuffer(int millisOfDay, int precision) {
.putLong(0, millisOfDay * 1000L);
}

private static ByteBuffer arrayToByteBuffer(ArrayType arrayType, InternalArray array) {
DataType elementType = arrayType.getElementType();
InternalArray.ElementGetter getter = InternalArray.createElementGetter(elementType);
List<ByteBuffer> elementBufs = new ArrayList<>();
int dataSize = 0;
for (int i = 0; i < array.size(); i++) {
if (array.isNullAt(i)) {
elementBufs.add(null);
} else {
Object element = getter.getElementOrNull(array, i);
ByteBuffer buf = toByteBuffer(elementType, element);
elementBufs.add(buf);
dataSize += buf.limit();
}
}
ByteBuffer result =
ByteBuffer.allocate(4 + array.size() * 4 + dataSize).order(ByteOrder.LITTLE_ENDIAN);
result.putInt(array.size());
for (ByteBuffer buf : elementBufs) {
if (buf == null) {
result.putInt(-1);
} else {
result.putInt(buf.limit());
result.put(buf);
}
}
result.flip();
return result;
}

private static ByteBuffer mapToByteBuffer(MapType mapType, InternalMap map) {
InternalArray keys = map.keyArray();
InternalArray values = map.valueArray();
DataType keyType = mapType.getKeyType();
DataType valueType = mapType.getValueType();
InternalArray.ElementGetter keyGetter = InternalArray.createElementGetter(keyType);
InternalArray.ElementGetter valueGetter = InternalArray.createElementGetter(valueType);
List<ByteBuffer> keyBufs = new ArrayList<>();
List<ByteBuffer> valueBufs = new ArrayList<>();
int dataSize = 0;
for (int i = 0; i < map.size(); i++) {
ByteBuffer keyBuf;
if (keys.isNullAt(i)) {
keyBuf = null;
} else {
keyBuf = toByteBuffer(keyType, keyGetter.getElementOrNull(keys, i));
dataSize += keyBuf.limit();
}
keyBufs.add(keyBuf);
ByteBuffer valueBuf;
if (values.isNullAt(i)) {
valueBuf = null;
} else {
valueBuf = toByteBuffer(valueType, valueGetter.getElementOrNull(values, i));
dataSize += valueBuf.limit();
}
valueBufs.add(valueBuf);
}
ByteBuffer result =
ByteBuffer.allocate(4 + map.size() * 8 + dataSize).order(ByteOrder.LITTLE_ENDIAN);
result.putInt(map.size());
for (int i = 0; i < map.size(); i++) {
ByteBuffer keyBuf = keyBufs.get(i);
if (keyBuf == null) {
result.putInt(-1);
} else {
result.putInt(keyBuf.limit());
result.put(keyBuf);
}
ByteBuffer valueBuf = valueBufs.get(i);
if (valueBuf == null) {
result.putInt(-1);
} else {
result.putInt(valueBuf.limit());
result.put(valueBuf);
}
}
result.flip();
return result;
}

private static ByteBuffer rowToByteBuffer(RowType rowType, InternalRow row) {
List<DataField> fields = rowType.getFields();
List<ByteBuffer> fieldBufs = new ArrayList<>();
int dataSize = 0;
for (int i = 0; i < fields.size(); i++) {
if (row.isNullAt(i)) {
fieldBufs.add(null);
} else {
Object fieldValue = getFieldValue(row, fields.get(i).type(), i);
ByteBuffer buf = toByteBuffer(fields.get(i).type(), fieldValue);
fieldBufs.add(buf);
dataSize += buf.limit();
}
}
ByteBuffer result =
ByteBuffer.allocate(fields.size() * 4 + dataSize).order(ByteOrder.LITTLE_ENDIAN);
for (ByteBuffer buf : fieldBufs) {
if (buf == null) {
result.putInt(-1);
} else {
result.putInt(buf.limit());
result.put(buf);
}
}
result.flip();
return result;
}

private static Object getFieldValue(InternalRow row, DataType fieldType, int pos) {
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
return row.getBoolean(pos);
case TINYINT:
return row.getByte(pos);
case SMALLINT:
return row.getShort(pos);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return row.getInt(pos);
case BIGINT:
return row.getLong(pos);
case FLOAT:
return row.getFloat(pos);
case DOUBLE:
return row.getDouble(pos);
case CHAR:
case VARCHAR:
return row.getString(pos);
case BINARY:
case VARBINARY:
return row.getBinary(pos);
case DECIMAL:
DecimalType dt = (DecimalType) fieldType;
return row.getDecimal(pos, dt.getPrecision(), dt.getScale());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return row.getTimestamp(pos, ((TimestampType) fieldType).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return row.getTimestamp(pos, ((LocalZonedTimestampType) fieldType).getPrecision());
case ARRAY:
return row.getArray(pos);
case MAP:
return row.getMap(pos);
case ROW:
return row.getRow(pos, ((RowType) fieldType).getFieldCount());
default:
throw new UnsupportedOperationException(
"Cannot get field value for type: " + fieldType);
}
}

public static Object toPaimonObject(DataType type, byte[] bytes) {
switch (type.getTypeRoot()) {
case BOOLEAN:
Expand Down Expand Up @@ -188,8 +360,57 @@ public static Object toPaimonObject(DataType type, byte[] bytes) {
"Paimon Iceberg compatibility only support time type with precision from 0 to 3.");
long timeMicros = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong();
return (int) (timeMicros / 1000L);
case ARRAY:
return arrayFromBytes((ArrayType) type, bytes);
case MAP:
return mapFromBytes((MapType) type, bytes);
case ROW:
return rowFromBytes((RowType) type, bytes);
default:
throw new UnsupportedOperationException("Cannot deserialize type: " + type);
}
}

private static Object arrayFromBytes(ArrayType arrayType, byte[] bytes) {
ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
int count = buf.getInt();
DataType elementType = arrayType.getElementType();
Object[] elements = new Object[count];
for (int i = 0; i < count; i++) {
elements[i] = readElement(elementType, buf);
}
return new GenericArray(elements);
}

private static Object mapFromBytes(MapType mapType, byte[] bytes) {
ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
int count = buf.getInt();
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < count; i++) {
Object key = readElement(mapType.getKeyType(), buf);
Object value = readElement(mapType.getValueType(), buf);
map.put(key, value);
}
return new GenericMap(map);
}

private static Object rowFromBytes(RowType rowType, byte[] bytes) {
ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
List<DataField> fields = rowType.getFields();
GenericRow row = new GenericRow(fields.size());
for (int i = 0; i < fields.size(); i++) {
row.setField(i, readElement(fields.get(i).type(), buf));
}
return row;
}

private static Object readElement(DataType type, ByteBuffer buf) {
int length = buf.getInt();
if (length == -1) {
return null;
}
byte[] elementBytes = new byte[length];
buf.get(elementBytes);
return toPaimonObject(type, elementBytes);
}
}
Loading