Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@

import org.apache.fluss.lake.source.LakeSplit;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.table.source.DataSplit;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/** Split for paimon table. */
Expand All @@ -36,9 +33,13 @@ public class PaimonSplit implements LakeSplit {

private final boolean isBucketUnAware;

public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware) {
// Partition values in Fluss partition-name format
private final List<String> partition;

public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware, List<String> partition) {
this.dataSplit = dataSplit;
this.isBucketUnAware = isBucketUnAware;
this.partition = partition;
}

@Override
Expand All @@ -52,19 +53,7 @@ public int bucket() {

@Override
public List<String> partition() {
BinaryRow partition = dataSplit.partition();
if (partition.getFieldCount() == 0) {
return Collections.emptyList();
}

List<String> partitions = new ArrayList<>();
for (int i = 0; i < partition.getFieldCount(); i++) {
// Todo Currently, partition column must be String datatype, so we can always use
// consider it as string. Revisit here when
// #489 is finished.
partitions.add(partition.getString(i).toString());
}
return partitions;
return partition;
}
Comment on lines 55 to 57

public DataSplit dataSplit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.source.Planner;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.RowType;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
Expand All @@ -40,6 +42,8 @@
import java.util.Collections;
import java.util.List;

import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toFlussPartitionValues;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toFlussRowType;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;

/** Split panner for paimon table. */
Expand Down Expand Up @@ -69,13 +73,20 @@ public List<PaimonSplit> plan() {
FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId);
InnerTableScan tableScan = fileStoreTable.newScan();
boolean isBucketUnAware = fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE;
RowType flussPartitionType =
toFlussRowType(fileStoreTable.schema().logicalPartitionType());

if (predicate != null) {
tableScan = tableScan.withFilter(predicate);
}
for (Split split : tableScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
splits.add(new PaimonSplit(dataSplit, isBucketUnAware));
BinaryRow partitionRow = dataSplit.partition();
List<String> partition =
partitionRow.getFieldCount() == 0
? Collections.emptyList()
: toFlussPartitionValues(partitionRow, flussPartitionType);
splits.add(new PaimonSplit(dataSplit, isBucketUnAware, partition));
}
}
return splits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,20 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/** Serializer for paimon split. */
public class PaimonSplitSerializer implements SimpleVersionedSerializer<PaimonSplit> {

private static final int VERSION_1 = 1;
// VERSION_2 additionally persists the partition values.
private static final int VERSION_2 = 2;

@Override
public int getVersion() {
return VERSION_1;
return VERSION_2;
}

@Override
Expand All @@ -46,6 +51,11 @@ public byte[] serialize(PaimonSplit paimonSplit) throws IOException {
DataSplit dataSplit = paimonSplit.dataSplit();
InstantiationUtil.serializeObject(view, dataSplit);
view.writeBoolean(paimonSplit.isBucketUnAware());
List<String> partition = paimonSplit.partition();
view.writeInt(partition.size());
for (String value : partition) {
view.writeUTF(value);
}
Comment on lines 51 to +58
return out.toByteArray();
}

Expand All @@ -55,11 +65,20 @@ public PaimonSplit deserialize(int version, byte[] serialized) throws IOExceptio
DataSplit dataSplit;
try {
dataSplit = InstantiationUtil.deserializeObject(in, getClass().getClassLoader());

DataInputStream dis = new DataInputStream(in);
boolean isBucketUnAware = dis.readBoolean();
if (version == VERSION_1) {
DataInputStream dis = new DataInputStream(in);
boolean isBucketUnAware = dis.readBoolean();
return new PaimonSplit(dataSplit, isBucketUnAware);
// VERSION_1 payloads do not contain partition values. Keep them readable, but
// don't try to reconstruct partition names from DataSplit because non-string
// partition columns require type information that is not present in the payload.
return new PaimonSplit(dataSplit, isBucketUnAware, Collections.emptyList());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR returns new PaimonSplit(dataSplit, isBucketUnAware, Collections.emptyList()) for old VERSION_1 payloads. However, old payloads still contain DataSplit.partition(). For the previously supported string partition case, the old PaimonSplit.partition() recovered the value via BinaryRow.getString(i). After this change, deserializing the same v1 bytes changes ["A"] into [].

This breaks the expected compatibility contract of SimpleVersionedSerializer. It is reasonable that non-string v1 payloads cannot be reconstructed type-safely, but old string partition splits should not lose their existing behavior.

Suggested fix: in the VERSION_1 branch, preserve the old logic by reading dataSplit.partition() with getString(i) when fields exist. Add a v1 compatibility test that serializes old-format bytes and verifies a string partition still round-trips.

} else if (version == VERSION_2) {
int size = dis.readInt();
List<String> partition = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
partition.add(dis.readUTF());
}
return new PaimonSplit(dataSplit, isBucketUnAware, partition);
} else {
throw new IOException("Unsupported PaimonSplit serialization version: " + version);
}
Comment on lines 70 to 84
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
Expand All @@ -48,7 +47,6 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -601,13 +599,12 @@ private Map<String, Long> getPartitionNameToIdMapping() throws IOException {
* @return partition name string
*/
private String getPartitionNameFromBinaryRow(BinaryRow partition) {
List<String> partitionValues = new ArrayList<>();
for (int i = 0; i < partition.getFieldCount(); i++) {
// todo: consider other partition type
BinaryString binaryString = partition.getString(i);
partitionValues.add(binaryString.toString());
}
return String.join(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, partitionValues);
return String.join(
ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR,
PaimonConversions.toFlussPartitionValues(
partition,
PaimonConversions.toFlussRowType(
fileStoreTable.schema().logicalPartitionType())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.DataTypeRoot;
import org.apache.fluss.utils.PartitionUtils;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -116,6 +119,30 @@ public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) {
.getFieldOrNull(flussRowAsPaimonRow);
}

public static org.apache.fluss.types.RowType toFlussRowType(RowType paimonRowType) {
org.apache.fluss.types.RowType.Builder builder = org.apache.fluss.types.RowType.builder();
for (DataField field : paimonRowType.getFields()) {
builder.field(
field.name(), field.type().accept(PaimonDataTypeToFlussDataType.INSTANCE));
}
return builder.build();
}

/**
* Renders a Paimon partition row into Fluss partition value strings, in partition-key order.
*/
public static List<String> toFlussPartitionValues(
BinaryRow partition, org.apache.fluss.types.RowType flussPartitionType) {
PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow().replaceRow(partition);
List<String> values = new ArrayList<>(partition.getFieldCount());
for (int i = 0; i < partition.getFieldCount(); i++) {
org.apache.fluss.types.DataType flussType = flussPartitionType.getTypeAt(i);
Object value = InternalRow.createFieldGetter(flussType, i).getFieldOrNull(flussRow);
values.add(PartitionUtils.convertValueOfType(value, flussType.getTypeRoot()));
}
return values;
}

public static List<SchemaChange> toPaimonSchemaChanges(List<TableChange> tableChanges) {
List<SchemaChange> schemaChanges = new ArrayList<>(tableChanges.size());

Expand Down
Loading
Loading