diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java index 925377067e..ed47361c3c 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java @@ -20,11 +20,8 @@ import org.apache.fluss.lake.source.LakeSplit; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.table.source.DataSplit; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; /** Split for paimon table. */ @@ -36,9 +33,13 @@ public class PaimonSplit implements LakeSplit { private final boolean isBucketUnAware; - public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware) { + // Partition values in Fluss partition-name format + private final List partition; + + public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware, List partition) { this.dataSplit = dataSplit; this.isBucketUnAware = isBucketUnAware; + this.partition = partition; } @Override @@ -52,19 +53,7 @@ public int bucket() { @Override public List partition() { - BinaryRow partition = dataSplit.partition(); - if (partition.getFieldCount() == 0) { - return Collections.emptyList(); - } - - List partitions = new ArrayList<>(); - for (int i = 0; i < partition.getFieldCount(); i++) { - // Todo Currently, partition column must be String datatype, so we can always use - // consider it as string. Revisit here when - // #489 is finished. - partitions.add(partition.getString(i).toString()); - } - return partitions; + return partition; } public DataSplit dataSplit() { diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java index db723f581f..e20e3fe8b3 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java @@ -21,11 +21,13 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.source.Planner; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.types.RowType; import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.BucketMode; @@ -40,6 +42,8 @@ import java.util.Collections; import java.util.List; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toFlussPartitionValues; +import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toFlussRowType; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; /** Split panner for paimon table. */ @@ -69,13 +73,20 @@ public List plan() { FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId); InnerTableScan tableScan = fileStoreTable.newScan(); boolean isBucketUnAware = fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE; + RowType flussPartitionType = + toFlussRowType(fileStoreTable.schema().logicalPartitionType()); if (predicate != null) { tableScan = tableScan.withFilter(predicate); } for (Split split : tableScan.plan().splits()) { DataSplit dataSplit = (DataSplit) split; - splits.add(new PaimonSplit(dataSplit, isBucketUnAware)); + BinaryRow partitionRow = dataSplit.partition(); + List partition = + partitionRow.getFieldCount() == 0 + ? Collections.emptyList() + : toFlussPartitionValues(partitionRow, flussPartitionType); + splits.add(new PaimonSplit(dataSplit, isBucketUnAware, partition)); } } return splits; diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java index c5e4d38d1d..f761efd50e 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java @@ -28,15 +28,20 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** Serializer for paimon split. */ public class PaimonSplitSerializer implements SimpleVersionedSerializer { private static final int VERSION_1 = 1; + // VERSION_2 additionally persists the partition values. + private static final int VERSION_2 = 2; @Override public int getVersion() { - return VERSION_1; + return VERSION_2; } @Override @@ -46,6 +51,11 @@ public byte[] serialize(PaimonSplit paimonSplit) throws IOException { DataSplit dataSplit = paimonSplit.dataSplit(); InstantiationUtil.serializeObject(view, dataSplit); view.writeBoolean(paimonSplit.isBucketUnAware()); + List partition = paimonSplit.partition(); + view.writeInt(partition.size()); + for (String value : partition) { + view.writeUTF(value); + } return out.toByteArray(); } @@ -55,11 +65,20 @@ public PaimonSplit deserialize(int version, byte[] serialized) throws IOExceptio DataSplit dataSplit; try { dataSplit = InstantiationUtil.deserializeObject(in, getClass().getClassLoader()); - + DataInputStream dis = new DataInputStream(in); + boolean isBucketUnAware = dis.readBoolean(); if (version == VERSION_1) { - DataInputStream dis = new DataInputStream(in); - boolean isBucketUnAware = dis.readBoolean(); - return new PaimonSplit(dataSplit, isBucketUnAware); + // VERSION_1 payloads do not contain partition values. Keep them readable, but + // don't try to reconstruct partition names from DataSplit because non-string + // partition columns require type information that is not present in the payload. + return new PaimonSplit(dataSplit, isBucketUnAware, Collections.emptyList()); + } else if (version == VERSION_2) { + int size = dis.readInt(); + List partition = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + partition.add(dis.readUTF()); + } + return new PaimonSplit(dataSplit, isBucketUnAware, partition); } else { throw new IOException("Unsupported PaimonSplit serialization version: " + version); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 54c3c7f948..0d55edc484 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -35,7 +35,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.BinaryString; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.FileStoreScan; @@ -48,7 +47,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -601,13 +599,12 @@ private Map getPartitionNameToIdMapping() throws IOException { * @return partition name string */ private String getPartitionNameFromBinaryRow(BinaryRow partition) { - List partitionValues = new ArrayList<>(); - for (int i = 0; i < partition.getFieldCount(); i++) { - // todo: consider other partition type - BinaryString binaryString = partition.getString(i); - partitionValues.add(binaryString.toString()); - } - return String.join(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, partitionValues); + return String.join( + ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, + PaimonConversions.toFlussPartitionValues( + partition, + PaimonConversions.toFlussRowType( + fileStoreTable.schema().logicalPartitionType()))); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index 94166e2d05..4bb12a0f42 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -28,12 +28,15 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.utils.PartitionUtils; import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -116,6 +119,30 @@ public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) { .getFieldOrNull(flussRowAsPaimonRow); } + public static org.apache.fluss.types.RowType toFlussRowType(RowType paimonRowType) { + org.apache.fluss.types.RowType.Builder builder = org.apache.fluss.types.RowType.builder(); + for (DataField field : paimonRowType.getFields()) { + builder.field( + field.name(), field.type().accept(PaimonDataTypeToFlussDataType.INSTANCE)); + } + return builder.build(); + } + + /** + * Renders a Paimon partition row into Fluss partition value strings, in partition-key order. + */ + public static List toFlussPartitionValues( + BinaryRow partition, org.apache.fluss.types.RowType flussPartitionType) { + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow().replaceRow(partition); + List values = new ArrayList<>(partition.getFieldCount()); + for (int i = 0; i < partition.getFieldCount(); i++) { + org.apache.fluss.types.DataType flussType = flussPartitionType.getTypeAt(i); + Object value = InternalRow.createFieldGetter(flussType, i).getFieldOrNull(flussRow); + values.add(PartitionUtils.convertValueOfType(value, flussType.getTypeRoot())); + } + return values; + } + public static List toPaimonSchemaChanges(List tableChanges) { List schemaChanges = new ArrayList<>(tableChanges.size()); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonDataTypeToFlussDataType.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonDataTypeToFlussDataType.java new file mode 100644 index 0000000000..36f4678f1b --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonDataTypeToFlussDataType.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.types.DataTypes; + +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeDefaultVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +/** + * Convert from Paimon's data type to Fluss's data type (inverse of {@link + * FlussDataTypeToPaimonDataType}). + */ +public class PaimonDataTypeToFlussDataType + extends DataTypeDefaultVisitor { + + public static final PaimonDataTypeToFlussDataType INSTANCE = + new PaimonDataTypeToFlussDataType(); + + @Override + public org.apache.fluss.types.DataType visit(CharType charType) { + return withNullability(DataTypes.CHAR(charType.getLength()), charType); + } + + @Override + public org.apache.fluss.types.DataType visit(VarCharType varCharType) { + return withNullability(DataTypes.STRING(), varCharType); + } + + @Override + public org.apache.fluss.types.DataType visit(BooleanType booleanType) { + return withNullability(DataTypes.BOOLEAN(), booleanType); + } + + @Override + public org.apache.fluss.types.DataType visit(BinaryType binaryType) { + return withNullability(DataTypes.BINARY(binaryType.getLength()), binaryType); + } + + @Override + public org.apache.fluss.types.DataType visit(VarBinaryType varBinaryType) { + return withNullability(DataTypes.BYTES(), varBinaryType); + } + + @Override + public org.apache.fluss.types.DataType visit(DecimalType decimalType) { + return withNullability( + DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()), decimalType); + } + + @Override + public org.apache.fluss.types.DataType visit(TinyIntType tinyIntType) { + return withNullability(DataTypes.TINYINT(), tinyIntType); + } + + @Override + public org.apache.fluss.types.DataType visit(SmallIntType smallIntType) { + return withNullability(DataTypes.SMALLINT(), smallIntType); + } + + @Override + public org.apache.fluss.types.DataType visit(IntType intType) { + return withNullability(DataTypes.INT(), intType); + } + + @Override + public org.apache.fluss.types.DataType visit(BigIntType bigIntType) { + return withNullability(DataTypes.BIGINT(), bigIntType); + } + + @Override + public org.apache.fluss.types.DataType visit(FloatType floatType) { + return withNullability(DataTypes.FLOAT(), floatType); + } + + @Override + public org.apache.fluss.types.DataType visit(DoubleType doubleType) { + return withNullability(DataTypes.DOUBLE(), doubleType); + } + + @Override + public org.apache.fluss.types.DataType visit(DateType dateType) { + return withNullability(DataTypes.DATE(), dateType); + } + + @Override + public org.apache.fluss.types.DataType visit(TimeType timeType) { + return withNullability(DataTypes.TIME(timeType.getPrecision()), timeType); + } + + @Override + public org.apache.fluss.types.DataType visit(TimestampType timestampType) { + return withNullability(DataTypes.TIMESTAMP(timestampType.getPrecision()), timestampType); + } + + @Override + public org.apache.fluss.types.DataType visit(LocalZonedTimestampType localZonedTimestampType) { + return withNullability( + DataTypes.TIMESTAMP_LTZ(localZonedTimestampType.getPrecision()), + localZonedTimestampType); + } + + @Override + public org.apache.fluss.types.DataType visit(ArrayType arrayType) { + return withNullability(DataTypes.ARRAY(arrayType.getElementType().accept(this)), arrayType); + } + + @Override + public org.apache.fluss.types.DataType visit(MapType mapType) { + return withNullability( + DataTypes.MAP( + mapType.getKeyType().accept(this), mapType.getValueType().accept(this)), + mapType); + } + + @Override + public org.apache.fluss.types.DataType visit(RowType rowType) { + org.apache.fluss.types.RowType.Builder builder = org.apache.fluss.types.RowType.builder(); + for (DataField field : rowType.getFields()) { + org.apache.fluss.types.DataType fieldType = field.type().accept(this); + if (field.description() == null) { + builder.field(field.name(), fieldType); + } else { + builder.field(field.name(), fieldType, field.description()); + } + } + return withNullability(builder.build(), rowType); + } + + @Override + protected org.apache.fluss.types.DataType defaultMethod(DataType dataType) { + throw new UnsupportedOperationException( + "Unsupported data type to convert to Fluss: " + dataType.getTypeRoot()); + } + + private static org.apache.fluss.types.DataType withNullability( + org.apache.fluss.types.DataType flussType, DataType paimonType) { + return flussType.copy(paimonType.isNullable()); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java index 3f72cea558..dc75f34e44 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java @@ -77,6 +77,7 @@ void testSerializeAndDeserialize() throws Exception { assertThat(deserialized.dataSplit()).isEqualTo(originalPaimonSplit.dataSplit()); assertThat(deserialized.isBucketUnAware()).isEqualTo(originalPaimonSplit.isBucketUnAware()); + assertThat(deserialized.partition()).isEqualTo(originalPaimonSplit.partition()); } @Test diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java index a81800374d..9d5b5ee5de 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java @@ -35,6 +35,7 @@ import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.Test; +import java.time.LocalDate; import java.util.Collections; import java.util.List; @@ -77,4 +78,33 @@ void testPaimonSplit() throws Exception { assertThat(actualSplit).isEqualTo(paimonSplit.dataSplit()); assertThat(((DataSplit) actualSplit).bucket()).isEqualTo(paimonSplit.bucket()); } + + @Test + void testPaimonSplitWithDatePartition() throws Exception { + int bucketNum = 1; + TablePath tablePath = TablePath.of(DEFAULT_DB, "non_string_partition_table"); + Schema.Builder builder = + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("dt", DataTypes.DATE()); + builder.partitionKeys("dt"); + builder.primaryKey("c1", "dt"); + builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + createTable(tablePath, builder.build()); + Table table = getTable(tablePath); + + int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay(); + GenericRow record1 = GenericRow.of(12, BinaryString.fromString("a"), epochDay); + writeRecord(tablePath, Collections.singletonList(record1)); + Snapshot snapshot = table.latestSnapshot().get(); + + LakeSource lakeSource = lakeStorage.createLakeSource(tablePath); + List paimonSplits = lakeSource.createPlanner(snapshot::id).plan(); + + // The DATE partition must be rendered in Fluss partition-name format ("2024-03-01"), + // not read blindly via BinaryRow.getString which yields garbage for non-String columns. + PaimonSplit paimonSplit = paimonSplits.get(0); + assertThat(paimonSplit.partition()).isEqualTo(Collections.singletonList("2024-03-01")); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonConversionsTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonConversionsTest.java new file mode 100644 index 0000000000..45188b261c --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonConversionsTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.utils.PartitionUtils; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.junit.jupiter.api.Test; + +import java.time.LocalDate; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for partition value conversion in {@link PaimonConversions}. */ +class PaimonConversionsTest { + + /** Each type's lake-side string must equal the Fluss-side name ({@code convertValueOfType}). */ + @Test + void testTypesMatchFlussName() { + int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay(); + int milliOfDay = ((12 * 60 + 34) * 60 + 56) * 1000 + 789; + long ms = 1709294096123L; + int nanos = 456000; // multiple of 1000 to survive microsecond precision + byte[] bytes = {1, 2, 3, (byte) 0xAB}; + + assertMatches( + DataTypes.BOOLEAN(), w -> w.writeBoolean(0, true), true, DataTypeRoot.BOOLEAN); + assertMatches( + DataTypes.TINYINT(), w -> w.writeByte(0, (byte) 7), (byte) 7, DataTypeRoot.TINYINT); + assertMatches( + DataTypes.SMALLINT(), + w -> w.writeShort(0, (short) 300), + (short) 300, + DataTypeRoot.SMALLINT); + assertMatches(DataTypes.INT(), w -> w.writeInt(0, 42), 42, DataTypeRoot.INTEGER); + assertMatches(DataTypes.BIGINT(), w -> w.writeLong(0, 123L), 123L, DataTypeRoot.BIGINT); + assertMatches(DataTypes.FLOAT(), w -> w.writeFloat(0, 1.5f), 1.5f, DataTypeRoot.FLOAT); + assertMatches(DataTypes.DOUBLE(), w -> w.writeDouble(0, 2.5d), 2.5d, DataTypeRoot.DOUBLE); + assertMatches(DataTypes.DATE(), w -> w.writeInt(0, epochDay), epochDay, DataTypeRoot.DATE); + assertMatches( + DataTypes.TIME(), + w -> w.writeInt(0, milliOfDay), + milliOfDay, + DataTypeRoot.TIME_WITHOUT_TIME_ZONE); + assertMatches(DataTypes.BYTES(), w -> w.writeBinary(0, bytes), bytes, DataTypeRoot.BYTES); + assertMatches( + DataTypes.TIMESTAMP(6), + w -> w.writeTimestamp(0, Timestamp.fromEpochMillis(ms, nanos), 6), + org.apache.fluss.row.TimestampNtz.fromMillis(ms, nanos), + DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); + assertMatches( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6), + w -> w.writeTimestamp(0, Timestamp.fromEpochMillis(ms, nanos), 6), + org.apache.fluss.row.TimestampLtz.fromEpochMillis(ms, nanos), + DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + } + + @Test + void testStringPartition() { + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeString(0, BinaryString.fromString("a")); + writer.complete(); + + assertThat(flussPartitionNames(partition, RowType.of(DataTypes.STRING()))) + .containsExactly("a"); + } + + @Test + void testMultiColumnPartition() { + int epochDay = (int) LocalDate.of(2024, 3, 1).toEpochDay(); + BinaryRow partition = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeInt(0, epochDay); + writer.writeInt(1, 42); + writer.complete(); + + assertThat(flussPartitionNames(partition, RowType.of(DataTypes.DATE(), DataTypes.INT()))) + .containsExactly("2024-03-01", "42"); + } + + @Test + void testNestedTypeConversion() { + RowType paimon = + RowType.of( + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT()), + RowType.of(DataTypes.INT(), DataTypes.STRING())); + org.apache.fluss.types.RowType fluss = PaimonConversions.toFlussRowType(paimon); + + assertThat(fluss.getTypeAt(0).getTypeRoot()).isEqualTo(DataTypeRoot.ARRAY); + assertThat( + ((org.apache.fluss.types.ArrayType) fluss.getTypeAt(0)) + .getElementType() + .getTypeRoot()) + .isEqualTo(DataTypeRoot.INTEGER); + assertThat(fluss.getTypeAt(1).getTypeRoot()).isEqualTo(DataTypeRoot.MAP); + assertThat(fluss.getTypeAt(2).getTypeRoot()).isEqualTo(DataTypeRoot.ROW); + } + + /** Test helper: takes a Paimon partition type and runs the full Paimon -> Fluss conversion. */ + private static java.util.List flussPartitionNames( + BinaryRow partition, RowType paimonPartitionType) { + return PaimonConversions.toFlussPartitionValues( + partition, PaimonConversions.toFlussRowType(paimonPartitionType)); + } + + private static void assertMatches( + DataType paimonType, + Consumer writeValue, + Object flussValue, + DataTypeRoot flussRoot) { + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writeValue.accept(writer); + writer.complete(); + assertThat(flussPartitionNames(partition, RowType.of(paimonType))) + .as("type %s", flussRoot) + .containsExactly(PartitionUtils.convertValueOfType(flussValue, flussRoot)); + } +}