[lake] Fix union read duplicating rows for non-String partition columns#3500
[lake] Fix union read duplicating rows for non-String partition columns#3500bryndenZh wants to merge 2 commits into
Conversation
For a partitioned table with table.datalake.enabled=true and a non-String partition column (DATE, INT, TIMESTAMP, etc.), default union read returned each tiered row twice because PaimonSplit#partition() read every partition field via BinaryRow.getString regardless of logical type. The lake-side partition name then never matched the Fluss-side name in LakeSplitGenerator, so the same partition was emitted as both a lake-only split and a Fluss-log split. Add PaimonPartitionUtils to render partition values in a logical-type-aware way matching PartitionUtils#convertValueOfType, compute them in PaimonSplitPlanner, and apply the same fix to DvTableReadableSnapshotRetriever. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Fixes duplicated rows in lake/union reads for partitioned tables when partition columns are non-STRING types by rendering Paimon partition values in the same type-aware format Fluss uses for partition names.
Changes:
- Add
PaimonPartitionUtils.partitionValues(...)to convertBinaryRowpartition fields type-aware into Fluss partition-name strings. - Use the new conversion in split planning and DV-table partition name mapping so lake-side and Fluss-side partition names match.
- Extend tests to cover type parity with
PartitionUtils#convertValueOfType, end-to-end DATE partition rendering, and split (de)serialization.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java | New utility to render Paimon partition rows into Fluss-formatted partition value strings (type-aware). |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java | Use type-aware partition conversion when building Fluss partition names from Paimon BinaryRow. |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java | Attach Fluss-formatted partition values onto planned PaimonSplits. |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java | Store partition values explicitly rather than re-reading via BinaryRow.getString(...). |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java | Serialize/deserialize partition values alongside existing split fields. |
| fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java | New unit tests ensuring Paimon conversion matches Fluss PartitionUtils across supported types. |
| fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java | Adds DATE-partition end-to-end assertion for planned split partition rendering. |
| fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java | Verifies partition values round-trip through the split serializer. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| DataSplit dataSplit = paimonSplit.dataSplit(); | ||
| InstantiationUtil.serializeObject(view, dataSplit); | ||
| view.writeBoolean(paimonSplit.isBucketUnAware()); | ||
| List<String> partition = paimonSplit.partition(); | ||
| view.writeInt(partition.size()); | ||
| for (String value : partition) { | ||
| view.writeUTF(value); | ||
| } |
| if (version == VERSION_1) { | ||
| DataInputStream dis = new DataInputStream(in); | ||
| boolean isBucketUnAware = dis.readBoolean(); | ||
| return new PaimonSplit(dataSplit, isBucketUnAware); | ||
| int size = dis.readInt(); | ||
| List<String> partition = new ArrayList<>(size); | ||
| for (int i = 0; i < size; i++) { | ||
| partition.add(dis.readUTF()); | ||
| } | ||
| return new PaimonSplit(dataSplit, isBucketUnAware, partition); | ||
| } else { | ||
| throw new IOException("Unsupported PaimonSplit serialization version: " + version); | ||
| } |
| public List<String> partition() { | ||
| BinaryRow partition = dataSplit.partition(); | ||
| if (partition.getFieldCount() == 0) { | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| List<String> partitions = new ArrayList<>(); | ||
| for (int i = 0; i < partition.getFieldCount(); i++) { | ||
| // Todo Currently, partition column must be String datatype, so we can always use | ||
| // consider it as string. Revisit here when | ||
| // #489 is finished. | ||
| partitions.add(partition.getString(i).toString()); | ||
| } | ||
| return partitions; | ||
| return partition; | ||
| } |
|
@bryndenZh Thanks for the pr. Nice catch on the root cause. One thing worth tightening before merge:
We can delegate formatting to the single authoritative public static List<String> toFlussPartitionValues(
BinaryRow partition, org.apache.fluss.types.RowType flussPartitionType) {
PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow().replaceRow(partition);
List<String> values = new ArrayList<>(partition.getFieldCount());
for (int i = 0; i < partition.getFieldCount(); i++) {
DataType flussType = flussPartitionType.getTypeAt(i);
Object value = InternalRow.createFieldGetter(flussType, i).getFieldOrNull(flussRow);
values.add(PartitionUtils.convertValueOfType(value, flussType.getTypeRoot()));
}
return values;
}This needs a Paimon→Fluss type converter, which doesn't exist yet (we only have the reverse A few smaller points:
|
|
@bryndenZh Hi, any progress on this pr? |
c9b9638 to
f6955b6
Compare
Thanks for the suggestions. I’ve updated the PR accordingly. PTAL. |
| // VERSION_1 payloads do not contain partition values. Keep them readable, but | ||
| // don't try to reconstruct partition names from DataSplit because non-string | ||
| // partition columns require type information that is not present in the payload. | ||
| return new PaimonSplit(dataSplit, isBucketUnAware, Collections.emptyList()); |
There was a problem hiding this comment.
The PR returns new PaimonSplit(dataSplit, isBucketUnAware, Collections.emptyList()) for old VERSION_1 payloads. However, old payloads still contain DataSplit.partition(). For the previously supported string partition case, the old PaimonSplit.partition() recovered the value via BinaryRow.getString(i). After this change, deserializing the same v1 bytes changes ["A"] into [].
This breaks the expected compatibility contract of SimpleVersionedSerializer. It is reasonable that non-string v1 payloads cannot be reconstructed type-safely, but old string partition splits should not lose their existing behavior.
Suggested fix: in the VERSION_1 branch, preserve the old logic by reading dataSplit.partition() with getString(i) when fields exist. Add a v1 compatibility test that serializes old-format bytes and verifies a string partition still round-trips.
luoyuxia
left a comment
There was a problem hiding this comment.
@bryndenZh Thanks for update. LGTM overall. Only one minor comment
Purpose
Related issue: #3490
For a partitioned table with
table.datalake.enabled=trueand a non-STRINGpartition column (
DATE,INT,TIMESTAMP, ...), union read returns each tieredrow twice.
PaimonSplit#partition()read every field viaBinaryRow.getStringregardless oftype, so for non-
STRINGcolumns the lake-side partition name was garbage and nevermatched the Fluss-side name in
LakeSplitGenerator— the partition was then emittedas both a lake split and a Fluss-log split.
Brief change log
PaimonPartitionUtils#partitionValuesto render partition values type-aware,matching the Fluss-side format (
PartitionUtils#convertValueOfType).Tests
PaimonPartitionUtilsTest: every partition type's output equalsPartitionUtils#convertValueOfTypefor the same value; plus string andmulti-column cases.
PaimonSplitTest#testPaimonSplitWithDatePartition: a DATE-partitioned tableyields
["2024-03-01"]end-to-end through the planner (garbage before).PaimonSplitSerializerTest: partition values round-trip through serialization.API and Format
PaimonSplit's serialized form now includes the partition values (appended afterthe existing fields).
Documentation
No user-facing documentation change.