diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java index 53766440ff01..ee512fe2d888 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java @@ -89,6 +89,15 @@ public RecordIterator readBatch() throws IOException { iterators[i] = batch; } } + // Expose file path and position when possible so callers that need per-row file + // metadata (e.g. Spark metadata columns and copy-on-write group filtering) can treat + // the assembled row as coming from one deterministic member file of the row-id group. + for (RecordIterator iterator : iterators) { + if (iterator instanceof FileRecordIterator) { + return new DataEvolutionFileRecordIterator( + row, iterators, (FileRecordIterator) iterator); + } + } return new DataEvolutionIterator(row, iterators); } diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileRecordIterator.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileRecordIterator.java new file mode 100644 index 000000000000..ec460a5223d9 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileRecordIterator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.reader; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.RecordReader.RecordIterator; + +/** + * A {@link DataEvolutionIterator} that is also a {@link FileRecordIterator}. The file path and + * returned position are delegated to one designated inner iterator, so a row assembled from + * multiple files reports one deterministic member file of its row-id group and its position within + * that group (all inner iterators are row-aligned). + */ +public class DataEvolutionFileRecordIterator extends DataEvolutionIterator + implements FileRecordIterator { + + private final FileRecordIterator designated; + + public DataEvolutionFileRecordIterator( + DataEvolutionRow row, + RecordIterator[] iterators, + FileRecordIterator designated) { + super(row, iterators); + this.designated = designated; + } + + @Override + public long returnedPosition() { + return designated.returnedPosition(); + } + + @Override + public Path filePath() { + return designated.filePath(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java b/paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java index f5a5f7f2e221..7043405a8b82 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java @@ -19,11 +19,15 @@ package org.apache.paimon.append; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.reader.RecordReader; import javax.annotation.Nullable; import java.io.IOException; +import java.io.UncheckedIOException; /** * A record reader that merges all batches from a multi-batch reader into a single concatenated @@ -53,7 +57,13 @@ public class ForceSingleBatchReader implements RecordReader { public ForceSingleBatchReader(RecordReader multiBatchReader) { this.multiBatchReader = multiBatchReader; - this.batch = new ConcatBatch(multiBatchReader); + // Preserve the file-record capability of the wrapped reader: callers like the + // data-evolution union read rely on per-row file path and position (e.g. Spark's + // __paimon_file_path metadata column and copy-on-write group filtering). + this.batch = + multiBatchReader instanceof FileRecordReader + ? new FileConcatBatch(multiBatchReader) + : new ConcatBatch(multiBatchReader); } @Override @@ -71,8 +81,8 @@ public void close() throws IOException { private static class ConcatBatch implements RecordIterator { - private final RecordReader reader; - private RecordIterator currentBatch; + protected final RecordReader reader; + protected RecordIterator currentBatch; private ConcatBatch(RecordReader reader) { this.reader = reader; @@ -107,4 +117,42 @@ public void releaseBatch() { } } } + + /** + * A {@link ConcatBatch} over a {@link FileRecordReader}, exposing the file path and returned + * position of the batch that produced the current row. Callers may ask for the file path before + * the first {@link #next()}, so the first underlying batch is loaded on demand. + */ + private static class FileConcatBatch extends ConcatBatch + implements FileRecordIterator { + + private FileConcatBatch(RecordReader reader) { + super(reader); + } + + @Override + public long returnedPosition() { + return currentFileBatch().returnedPosition(); + } + + @Override + public Path filePath() { + return currentFileBatch().filePath(); + } + + private FileRecordIterator currentFileBatch() { + if (currentBatch == null) { + try { + currentBatch = reader.readBatch(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + if (currentBatch == null) { + throw new IllegalStateException( + "The file batch is exhausted, file path and position are unavailable."); + } + } + return (FileRecordIterator) currentBatch; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 39bbca3338da..572946f0ede4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -643,30 +643,81 @@ Optional checkRowIdExistence( .collect(Collectors.toList()); RowRangeIndex existingIndex = RowRangeIndex.create(existingRanges, false); + // Row-id ranges removed by this same commit, keyed by partition and bucket. A + // copy-on-write update on a data-evolution table deletes whole row-id groups and re-adds + // rewritten rows with their original row ids. File rolling may make an added file cover + // only a sub-range of a deleted group and not mirror an existing file exactly; it is still + // consistent as long as its range is fully covered by ranges deleted in this commit + // (concurrent rewrites of those files are caught by the regular deleted-file conflict + // checks). + Map, List> deletedRanges = new HashMap<>(); + for (SimpleFileEntry entry : deltaEntries) { + if (entry.kind() == FileKind.DELETE && entry.firstRowId() != null) { + deletedRanges + .computeIfAbsent( + Pair.of(entry.partition(), entry.bucket()), k -> new ArrayList<>()) + .add( + new Range( + entry.firstRowId(), + entry.firstRowId() + entry.rowCount() - 1)); + } + } + for (SimpleFileEntry entry : filesToCheck) { Range rowRange = entry.nonNullRowIdRange(); boolean exists = dedicatedStorageFile(entry.fileName()) ? existingIndex.contains(rowRange) : existingIndex.containsExactly(rowRange); - if (!exists) { - return Optional.of( - new RuntimeException( - String.format( - "Row ID existence conflict: file '%s' references " - + "firstRowId=%d, rowCount=%d in bucket %d, " - + "but no matching file exists in the current snapshot. " - + "The referenced file may have been rewritten by a " - + "concurrent compaction or removed by an overwrite.", - entry.fileName(), - entry.firstRowId(), - entry.rowCount(), - entry.bucket()))); + if (exists) { + continue; } + List deleted = deletedRanges.get(Pair.of(entry.partition(), entry.bucket())); + if (coveredByRanges( + deleted, entry.firstRowId(), entry.firstRowId() + entry.rowCount() - 1)) { + continue; + } + return Optional.of( + new RuntimeException( + String.format( + "Row ID existence conflict: file '%s' references " + + "firstRowId=%d, rowCount=%d in bucket %d, " + + "but no matching file exists in the current snapshot. " + + "The referenced file may have been rewritten by a " + + "concurrent compaction or removed by an overwrite.", + entry.fileName(), + entry.firstRowId(), + entry.rowCount(), + entry.bucket()))); } return Optional.empty(); } + /** Whether {@code [from, to]} is fully covered by the union of the given ranges. */ + private static boolean coveredByRanges(@Nullable List ranges, long from, long to) { + if (ranges == null || ranges.isEmpty()) { + return false; + } + // Sort a copy: the per-bucket lists are held in the caller's deletedRanges map and may be + // probed by more than one file, so an existence check must not reorder them in place. + List sorted = new ArrayList<>(ranges); + sorted.sort(Comparator.comparingLong(range -> range.from)); + long cursor = from; + for (Range range : sorted) { + if (range.to < cursor) { + continue; + } + if (range.from > cursor) { + return false; + } + cursor = range.to + 1; + if (cursor > to) { + return true; + } + } + return cursor > to; + } + private static boolean dedicatedStorageFile(String fileName) { return isBlobFile(fileName) || isVectorStoreFile(fileName); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index c3371e1c19b9..f1417f8243e2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -533,6 +533,43 @@ void testCheckRowIdExistenceDedicatedFileIgnoresBaseDedicatedFiles() { assertThat(result.get().getMessage()).contains("Row ID existence conflict"); } + @Test + void testCheckRowIdExistenceAcceptsSubRangeCoveredBySameCommitDeletes() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L)); + + // Copy-on-write update: the whole group [0, 100) is deleted and the rewritten rows are + // re-added as sub-range files [0, 40) and [50, 100) with their original row ids. + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L)); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 40L)); + deltaEntries.add(createFileEntryWithRowId("p2", ADD, 50L, 50L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty(); + } + + @Test + void testCheckRowIdExistenceRejectsSubRangeNotCoveredBySameCommitDeletes() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L)); + baseEntries.add(createFileEntryWithRowId("f2", ADD, 100L, 100L)); + + // The added file [50, 150) spills past the deleted group [0, 100): rows of f2 are + // re-added without deleting f2, which would duplicate row ids. + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L)); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 50L, 100L)); + + Optional result = + detection.checkRowIdExistence(baseEntries, deltaEntries, 200L); + assertThat(result).isPresent(); + assertThat(result.get().getMessage()).contains("Row ID existence conflict"); + } + @Test void testCheckRowIdExistenceSkipsNewlyAppendedFiles() { ConflictDetection detection = createConflictDetection(); diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala index 60bfd244b2de..1bffafce80a8 100644 --- a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala @@ -35,4 +35,27 @@ class RowTrackingTest extends RowTrackingTestBase { } } } + + test("Data Evolution: Spark 3.5 keeps data-evolution tables off the V2 row-level path") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql( + "CREATE TABLE t (id INT, data INT) TBLPROPERTIES " + + "('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + assert(!SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations]) + + sql("INSERT INTO t VALUES (1, 1), (2, 2)") + assert( + intercept[RuntimeException] { + sql("DELETE FROM t WHERE id = 2") + }.getMessage + .contains("Delete operation is not supported when data evolution is enabled yet.")) + assert( + intercept[RuntimeException] { + sql("UPDATE t SET data = 20 WHERE id = 2") + }.getMessage + .contains("Update operation is not supported when data evolution is enabled yet.")) + } + } + } } diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala index 9f96840a7788..b74a6653c18d 100644 --- a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala @@ -18,4 +18,64 @@ package org.apache.paimon.spark.sql -class RowTrackingTest extends RowTrackingTestBase {} +import org.apache.spark.sql.Row + +class RowTrackingTest extends RowTrackingTestBase { + + test("Data Evolution: Spark 4.0 uses V2 copy-on-write for UPDATE") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql( + "CREATE TABLE t (id INT, data INT) TBLPROPERTIES " + + "('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (1, 1), (2, 2)") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (3, 3), (4, 4)") + + sql("UPDATE t SET data = 30 WHERE id = 3") + checkAnswer( + sql("SELECT *, _ROW_ID FROM t ORDER BY id"), + Seq(Row(1, 1, 0), Row(2, 2, 1), Row(3, 30, 2), Row(4, 4, 3)) + ) + } + } + } + + test("Data Evolution: DELETE remains unsupported") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql( + "CREATE TABLE t (id INT, data INT) TBLPROPERTIES " + + "('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t VALUES (1, 1), (2, 2)") + + assert( + intercept[Exception] { + sql("DELETE FROM t WHERE id = 2") + }.getMessage + .contains("Delete operation is not supported when data evolution is enabled yet")) + } + } + } + + test("Data Evolution: partition column update is rejected") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql(""" + |CREATE TABLE t (id INT, data INT, dt STRING) + |PARTITIONED BY (dt) + |TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p2')") + + assert( + intercept[Exception] { + sql("UPDATE t SET dt = 'p3' WHERE id = 1") + }.getMessage + .contains("Update to partition columns is not supported for data evolution tables")) + + sql("UPDATE t SET data = 10 WHERE id = 1") + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 10, "p1"), Row(2, 2, "p2"))) + } + } + } +} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala index 382aa1e77880..14fde5f9d13c 100644 --- a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala @@ -25,8 +25,14 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations import org.apache.spark.sql.types.Metadata +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt + class RowTrackingTest extends RowTrackingTestBase { + import testImplicits._ + test("Row Tracking: metadata columns expose Spark preserve flags") { val rowIdMetadata = Metadata.fromJson(PaimonMetadataColumn.ROW_ID.metadataInJSON()) assert(rowIdMetadata.getBoolean("__preserve_on_delete")) @@ -117,6 +123,236 @@ class RowTrackingTest extends RowTrackingTestBase { } } + test("Data Evolution: Spark 4.1 uses V2 copy-on-write for UPDATE") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql( + "CREATE TABLE t (id INT, data INT) TBLPROPERTIES " + + "('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + // One file (= one row-id group) per INSERT, so the UPDATE below rewrites a group that + // still contains carryover rows. + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (1, 1), (2, 2)") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (3, 3), (4, 4)") + + assertPlanContains("UPDATE t SET data = 30 WHERE id = 3", "ReplaceData") + sql("UPDATE t SET data = 30 WHERE id = 3") + // Group [2, 3] is rewritten: both the updated row and the carryover row keep their + // row ids and get the new snapshot's sequence number. + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 1), Row(3, 30, 2, 3), Row(4, 4, 3, 3)) + ) + } + } + } + + test("Data Evolution: DELETE remains unsupported") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql(""" + |CREATE TABLE t (id INT, data INT, dt STRING) + |PARTITIONED BY (dt) + |TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p1'), (3, 3, 'p2')") + + assert( + intercept[Exception] { + sql("DELETE FROM t WHERE id = 2") + }.getMessage + .contains("Delete operation is not supported when data evolution is enabled yet")) + assert( + intercept[Exception] { + sql("DELETE FROM t WHERE dt = 'p1'") + }.getMessage + .contains("Delete operation is not supported when data evolution is enabled yet")) + } + } + } + + test("Data Evolution: V2 update rewrites whole row-id groups with patch files") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("s", "t") { + sql( + "CREATE TABLE t (id INT, data INT, name STRING) TBLPROPERTIES " + + "('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t VALUES (1, 1, 'a'), (2, 2, 'b'), (3, 3, 'c')") + + // V1 data-evolution MERGE writes a partial-column patch file, so the row-id group now + // spans multiple files. + sql("CREATE TABLE s (id INT, data INT)") + sql("INSERT INTO s VALUES (2, 20), (3, 30)") + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET data = s.data + |""".stripMargin) + + // Multi-file groups must still serve __paimon_file_path: the assembled row reports one + // member file of its row-id group (via the FileRecordIterator pass-through in + // ForceSingleBatchReader / DataEvolutionFileReader). + val filePaths = sql("SELECT __paimon_file_path FROM t").collect().map(_.getString(0)) + assert(filePaths.length == 3 && filePaths.forall(_ != null), filePaths.mkString(", ")) + + // The predicate spans a patched column (data) and a base column (name), so the + // matching scan must merge-read the group while also producing the file path used for + // runtime group filtering. + sql("UPDATE t SET name = 'bb' WHERE data = 20 AND name = 'b'") + checkAnswer( + sql("SELECT *, _ROW_ID FROM t ORDER BY id"), + Seq(Row(1, 1, "a", 0), Row(2, 20, "bb", 1), Row(3, 30, "c", 2)) + ) + } + } + } + + test("Data Evolution: MERGE INTO keeps the V1 data-evolution command") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("s", "t") { + sql( + "CREATE TABLE t (id INT, data INT) TBLPROPERTIES " + + "('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t VALUES (1, 1), (2, 2)") + sql("CREATE TABLE s (id INT, data INT)") + sql("INSERT INTO s VALUES (2, 200), (5, 500)") + + val mergeSql = + """ + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET data = s.data + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin + val plan = explain(mergeSql) + assert(!plan.contains("ReplaceData"), plan) + assert(plan.contains("MergeIntoPaimonDataEvolutionTable"), plan) + sql(mergeSql) + + checkAnswer( + sql("SELECT *, _ROW_ID FROM t ORDER BY id"), + Seq(Row(1, 1, 0), Row(2, 200, 1), Row(5, 500, 2)) + ) + } + } + } + + test("Data Evolution: V2 UPDATE works with commit.force-compact") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + // Guards the run writer's assumption that the underlying append writer never + // restores or compacts existing files, under the config most likely to break it. + sql( + "CREATE TABLE t (id INT, data INT) TBLPROPERTIES " + + "('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true', " + + "'commit.force-compact' = 'true')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (1, 1), (2, 2)") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (3, 3), (4, 4)") + + sql("UPDATE t SET data = 30 WHERE id = 3") + checkAnswer( + sql("SELECT *, _ROW_ID FROM t ORDER BY id"), + Seq(Row(1, 1, 0), Row(2, 2, 1), Row(3, 30, 2), Row(4, 4, 3)) + ) + } + } + } + + test("Data Evolution: partition column update is rejected") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql(""" + |CREATE TABLE t (id INT, data INT, dt STRING) + |PARTITIONED BY (dt) + |TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true') + |""".stripMargin) + sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p2')") + + assert( + intercept[Exception] { + sql("UPDATE t SET dt = 'p3' WHERE id = 1") + }.getMessage + .contains("Update to partition columns is not supported for data evolution tables")) + + // Non-partition columns of a partitioned data-evolution table stay updatable. + sql("UPDATE t SET data = 10 WHERE id = 1") + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 10, "p1"), Row(2, 2, "p2"))) + } + } + } + + test("Data Evolution: concurrent V1 merge patch and V2 update detect row-id conflicts") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql( + "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES " + + "('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (1, 0, 0), (2, 0, 0)") + Seq((1, 1), (2, 1)).toDF("id", "c").createOrReplaceTempView("s") + + def hasConflictMessage(e: Throwable): Boolean = { + Iterator + .iterate(e)(_.getCause) + .takeWhile(_ != null) + .exists(t => t.getMessage != null && t.getMessage.toLowerCase.contains("conflict")) + } + + def retryOnConflict(action: => Unit): Unit = { + var success = false + while (!success) { + try { + action + success = true + } catch { + case e: Exception if hasConflictMessage(e) => // retry + } + } + } + + // The V1 MERGE only ADDs partial-column patch files while the V2 UPDATE rewrites whole + // row-id groups; without the scan-snapshot row-id conflict check on the V2 commit, an + // interleaved patch would silently survive next to the rewritten files and corrupt the + // group, losing increments below. + val mergeFuture = Future { + for (_ <- 1 to 5) { + retryOnConflict { + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.c = t.c + s.c + |""".stripMargin).collect() + } + } + } + val updateFuture = Future { + for (_ <- 1 to 5) { + retryOnConflict { + sql("UPDATE t SET b = b + 1").collect() + } + } + } + Await.result(mergeFuture, 180.seconds) + Await.result(updateFuture, 180.seconds) + + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 5, 5), Row(2, 5, 5))) + } + } + } + + test("Data Evolution: BLOB tables do not expose V2 row-level capability") { + withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") { + withTable("t") { + sql( + "CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES " + + "('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true', " + + "'blob-field' = 'picture')") + assert(!SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations]) + } + } + } + private def assertPlanContains(sqlText: String, fragment: String): Unit = { val plan = explain(sqlText) assert(plan.contains(fragment), plan) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2DataEvolutionDataWriter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2DataEvolutionDataWriter.java new file mode 100644 index 000000000000..128f64042d56 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2DataEvolutionDataWriter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.write; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.sink.BatchWriteBuilder; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +/** + * Spark 4.x calls DataWriter.write(metadata, data) for metadata-aware writes. Keep this method in + * Java so the common sources still compile against Spark 3.5, where that interface method does not + * exist; Spark 4.x compilation generates the erased bridge required by the runtime call. + */ +public class PaimonV2DataEvolutionDataWriter extends PaimonV2DataEvolutionDataWriterBase { + + public PaimonV2DataEvolutionDataWriter( + BatchWriteBuilder writeBuilder, + TableSchema tableSchema, + StructType writeSchema, + StructType dataSchema, + StructType metadataSchema, + CatalogContext catalogContext) { + super(writeBuilder, tableSchema, writeSchema, dataSchema, metadataSchema, catalogContext); + } + + public void write(InternalRow metadata, InternalRow data) { + writeWithMetadata(metadata, data); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala index 9ea20de1909d..594b1a7d096f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala @@ -25,7 +25,7 @@ import org.apache.paimon.table.{FileStoreTable, Table} import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsRowLevelOperations, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder -import org.apache.spark.sql.connector.write.{RowLevelOperationBuilder, RowLevelOperationInfo} +import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo} import org.apache.spark.sql.util.CaseInsensitiveStringMap import java.util.{EnumSet => JEnumSet, Set => JSet} @@ -40,15 +40,17 @@ import java.util.{EnumSet => JEnumSet, Set => JSet} * If this base class implemented `SupportsRowLevelOperations`, Spark 4.1 would immediately call * `newRowLevelOperationBuilder` on tables whose V2 write is disabled (e.g. dynamic bucket or * primary-key tables that fall back to V1 write) and fail before Paimon has a chance to rewrite the - * plan to a V1 command. Likewise, deletion-vector, data-evolution, and fixed-length CHAR tables - * need to stay on Paimon's V1 postHoc path even when `useV2Write=true`, so they must also not - * expose `SupportsRowLevelOperations`. + * plan to a V1 command. Likewise, deletion-vector, unsupported data-evolution variants, and + * fixed-length CHAR tables need to stay on Paimon's V1 postHoc path even when `useV2Write=true`, so + * they must also not expose `SupportsRowLevelOperations`. * * Tables that DO support V2 row-level operations use the [[SparkTableWithRowLevelOps]] subclass * instead; the [[SparkTable.of]] factory picks the right variant via - * [[SparkTable.supportsV2RowLevelOps]]. Append-only tables, including row-tracking-only tables, - * expose `SupportsRowLevelOperations` so DELETE, UPDATE, and MERGE INTO can go through the V2 - * copy-on-write path when the table has no PK, deletion vectors, data evolution, or CHAR columns. + * [[SparkTable.supportsV2RowLevelOps]]. Append-only tables expose `SupportsRowLevelOperations` so + * DELETE, UPDATE, and MERGE INTO can go through the V2 copy-on-write path when the table has no PK, + * deletion vectors, or CHAR columns. Data-evolution tables expose the capability only so UPDATE can + * use V2 copy-on-write; DELETE is rejected and MERGE INTO keeps the V1 + * `MergeIntoPaimonDataEvolutionTable` command which writes partial-column patch files. */ case class SparkTable(override val table: Table) extends PaimonSparkTableBase(table) @@ -64,6 +66,13 @@ class SparkTableWithRowLevelOps(tableArg: Table) info: RowLevelOperationInfo): RowLevelOperationBuilder = { table match { case t: FileStoreTable => + if ( + t.coreOptions().dataEvolutionEnabled() && + info.command() == RowLevelOperation.Command.DELETE + ) { + throw new RuntimeException( + "Delete operation is not supported when data evolution is enabled yet.") + } () => new PaimonSparkCopyOnWriteOperation(t, info) case _ => throw new UnsupportedOperationException( @@ -117,12 +126,28 @@ object SparkTable { !sparkTable.coreOptions.rowTrackingEnabled() || org.apache.spark.SPARK_VERSION >= "4.0" fs.primaryKeys().isEmpty && supportsRowTrackingCopyOnWrite && + supportsDataEvolutionCopyOnWrite(fs) && !sparkTable.coreOptions.deletionVectorsEnabled() && - !sparkTable.coreOptions.dataEvolutionEnabled() && !SparkTypeUtils.containsCharType(fs.rowType()) case _ => false } } + + /** + * Whether a table is eligible for the V2 copy-on-write UPDATE path with respect to data + * evolution. Non-data-evolution tables are unaffected (returns `true`). Data-evolution UPDATE + * rewrites whole row-id groups, which is not implemented yet for BLOB fields (blob files must + * stay aligned with the rewritten row ranges) or vector-store files. + * + * Single source of truth for the data-evolution capability gate, shared by + * [[supportsV2RowLevelOps]] and the Spark 4.1 `PureAppendOnlyScope` rewrite rules so the two + * cannot drift apart. + */ + def supportsDataEvolutionCopyOnWrite(fs: FileStoreTable): Boolean = { + val options = fs.coreOptions() + !options.dataEvolutionEnabled() || + (!SparkTypeUtils.containsBlobType(fs.rowType()) && !options.withVectorFormat()) + } } case class SparkIcebergTable(table: Table) extends BaseTable diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java index 80a27c35ac90..148b8cc0c15a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java @@ -116,6 +116,11 @@ public static boolean containsCharType(org.apache.paimon.types.DataType type) { return false; } + public static boolean containsBlobType(RowType rowType) { + return rowType.getFieldTypes().stream() + .anyMatch(type -> type.is(org.apache.paimon.types.DataTypeRoot.BLOB)); + } + /** * Prune Paimon `RowType` by required Spark `StructType`, use this method instead of {@link * #toPaimonType(DataType)} when need to retain the field id. diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala index e03e658141a6..0d7a37545770 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala @@ -49,11 +49,6 @@ object PaimonUpdateTable extends Rule[LogicalPlan] with RowLevelHelper with Expr throw new RuntimeException("Can't update the primary key column.") } - if (paimonTable.coreOptions().dataEvolutionEnabled()) { - throw new RuntimeException( - "Update operation is not supported when data evolution is enabled yet.") - } - // Align against `u.table.output`: for CHAR/VARCHAR columns the analyzer adds a // `readSidePadding` Project whose output has different exprIds than `relation`, and // the parsed assignment keys reference the Project's attributes. Order matches @@ -68,8 +63,25 @@ object PaimonUpdateTable extends Rule[LogicalPlan] with RowLevelHelper with Expr val alignedUpdateTable = u.copy(assignments = alignedAssignments) if (!shouldFallbackToV1Update(table, alignedUpdateTable)) { + if (paimonTable.coreOptions().dataEvolutionEnabled()) { + // The rewritten files keep the original row ids, which are derived from the + // file's firstRowId per partition; moving a row to another partition would + // need re-assigned row ids (delete + insert semantics). + val partitionKeys = paimonTable.partitionKeys().asScala.toSeq + if (!validUpdateAssignment(u.table.outputSet, partitionKeys, assignments)) { + throw new RuntimeException( + "Update to partition columns is not supported for data evolution tables.") + } + } alignedUpdateTable } else { + if (paimonTable.coreOptions().dataEvolutionEnabled()) { + // Data-evolution tables only support UPDATE through the V2 copy-on-write + // path (Spark 4.0+ with V2 write enabled); the V1 command cannot rewrite + // row-id groups. + throw new RuntimeException( + "Update operation is not supported when data evolution is enabled yet.") + } UpdatePaimonTableCommand( relation, paimonTable, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala index da4394867611..c66794b4860c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala @@ -104,6 +104,10 @@ trait RowLevelHelper extends SQLConfHelper { val relation = PaimonRelation.getPaimonRelation(m.targetTable) val table = relation.table.asInstanceOf[SparkTable] shouldFallbackToV1(table) || + // Data-evolution tables expose V2 row-level ops for UPDATE only; MERGE keeps the V1 + // `MergeIntoPaimonDataEvolutionTable` command which writes partial-column patch files instead + // of rewriting whole rows. + table.coreOptions.dataEvolutionEnabled() || !m.rewritable || !m.aligned } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index e433a5f7d49f..287aa37f6c3e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -56,12 +56,12 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { // Spark 4.1 moved the row-level command rewrite rules (`RewriteUpdateTable` / // `RewriteDeleteFromTable` / `RewriteMergeIntoTable`) into the main Resolution batch and // implemented them with `plan resolveOperators { ... }`, which short-circuits on - // `analyzed=true` nodes. For pure append-only tables (no PK / RT / DE / DV) on Spark 4.1+, - // UPDATE and MERGE subtrees get flipped to `analyzed=true` by Paimon's own Resolution-batch - // rules before Spark's rewrite can fire, so the nodes fall through to the physical planner - // and are rejected with `UNSUPPORTED_FEATURE.TABLE_OPERATION`. DELETE is not affected — - // Paimon has no Resolution-batch rule touching `DeleteFromTable` — but Spark's unconditional - // `RewriteDeleteFromTable` defeats Paimon's metadata-only-delete optimization. + // `analyzed=true` nodes. For append-only tables eligible for Paimon's V2 row-level path on + // Spark 4.1+, UPDATE and MERGE subtrees get flipped to `analyzed=true` by Paimon's own + // Resolution-batch rules before Spark's rewrite can fire, so the nodes fall through to the + // physical planner and are rejected with `UNSUPPORTED_FEATURE.TABLE_OPERATION`. DELETE is not + // affected — Paimon has no Resolution-batch rule touching `DeleteFromTable` — but Spark's + // unconditional `RewriteDeleteFromTable` defeats Paimon's metadata-only-delete optimization. // // Three companion rules in `paimon-spark4-common` fix each case: // - `Spark41UpdateTableRewrite` — transcribes `RewriteUpdateTable` output (ReplaceData) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala index 041da16efc8a..01fa2374aa4c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.rowops +import org.apache.paimon.io.DataFileMeta import org.apache.paimon.partition.PartitionPredicate import org.apache.paimon.predicate.Predicate import org.apache.paimon.spark.commands.SparkDataFileMeta @@ -26,6 +27,7 @@ import org.apache.paimon.spark.read.BaseScan import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.source.{DataSplit, Split} +import org.apache.paimon.utils.RangeHelper import org.apache.spark.sql.PaimonUtils import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference} @@ -76,10 +78,51 @@ case class PaimonCopyOnWriteScan( if (table.coreOptions().manifestDeleteFileDropStats()) { snapshotReader.dropStats() } - if (filterApplied) { + val isDataEvolution = table.coreOptions().dataEvolutionEnabled() + if (filterApplied && !isDataEvolution) { snapshotReader.withDataFileNameFilter(fileName => filteredFileNames.contains(fileName)) } - dataSplits = snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }.toArray + val splits = + snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }.toArray + dataSplits = if (filterApplied && isDataEvolution) { + splits.flatMap(filterDataEvolutionGroups) + } else { + splits + } + } + + /** + * A data-evolution logical row is assembled from all files of its row-id group but reports only + * one member file as `__paimon_file_path`, and one split may bin-pack several groups. Filtering + * at file granularity would tear groups apart and rewrite rows with stale column values, so + * regroup the split's files by row-id range and keep or drop whole groups. + */ + private def filterDataEvolutionGroups(split: DataSplit): Option[DataSplit] = { + val rangeHelper = new RangeHelper[DataFileMeta](f => f.nonNullRowIdRange()) + val groups = rangeHelper.mergeOverlappingRanges(split.dataFiles()).asScala + val kept = groups.filter(_.asScala.exists(f => filteredFileNames.contains(f.fileName()))) + if (kept.isEmpty) { + None + } else if (kept.size == groups.size) { + Some(split) + } else { + // The rebuild below copies only data files. Data-evolution tables never carry deletion + // vectors (the V2 capability gate excludes DV), so dropping them here would be a silent + // correctness bug rather than a supported case: fail loudly instead. + assert( + !split.deletionFiles().isPresent, + s"Data-evolution copy-on-write split unexpectedly carries deletion files, this is a bug: $split") + val builder = DataSplit + .builder() + .withSnapshot(split.snapshotId()) + .withPartition(split.partition()) + .withBucket(split.bucket()) + .withBucketPath(split.bucketPath()) + .withDataFiles(kept.flatMap(_.asScala).asJava) + .rawConvertible(split.rawConvertible()) + Option(split.totalBuckets()).foreach(builder.withTotalBuckets) + Some(builder.build()) + } } def scannedFiles: Seq[SparkDataFileMeta] = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala index 42d2ebcd8590..9f6c9a042692 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala @@ -26,7 +26,7 @@ import org.apache.paimon.spark.metric.SparkMetricRegistry import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH, ROW_ID, SEQUENCE_NUMBER} import org.apache.paimon.table.{FileStoreTable, SpecialFields} -import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl} +import org.apache.paimon.table.sink.{BatchWriteBuilder, BatchWriteBuilderImpl, CommitMessage, CommitMessageImpl} import org.apache.spark.sql.PaimonSparkSession import org.apache.spark.sql.connector.write.{DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} @@ -69,6 +69,12 @@ abstract class PaimonBatchWriteBase( builder } + // Data-evolution tables are row-tracking tables too, but their copy-on-write rewrite must not + // write physical row-tracking columns (see PaimonV2DataEvolutionDataWriterBase), so check the + // data-evolution branch first. + private val writeDataEvolution: Boolean = + coreOptions.dataEvolutionEnabled() && copyOnWriteScan.isDefined + private val writeRowTracking: Boolean = coreOptions.rowTrackingEnabled() && copyOnWriteScan.isDefined @@ -84,7 +90,9 @@ abstract class PaimonBatchWriteBase( protected def createPaimonDataWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { (_: Int, _: Long) => { - if (writeRowTracking) { + if (writeDataEvolution) { + createPaimonDataEvolutionDataWriter() + } else if (writeRowTracking) { createPaimonMetadataAwareDataWriter() } else { PaimonV2DataWriter( @@ -109,9 +117,30 @@ abstract class PaimonBatchWriteBase( rtPaimonWriteType) } + private def createPaimonDataEvolutionDataWriter(): PaimonV2DataEvolutionDataWriter = { + new PaimonV2DataEvolutionDataWriter( + batchWriteBuilder, + table.schema(), + writeSchema, + dataSchema, + rtMetadataSchema, + catalogContextForBlobDescriptor) + } + protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = { commitStarted = true logInfo(s"Committing to table ${table.name()}") + // For data-evolution copy-on-write, validate row-id ranges against commits that landed + // after the scan snapshot (same as the V1 MergeIntoPaimonDataEvolutionTable command): a + // concurrent partial-column patch commit only ADDs files, so the deleted-file conflict + // checks alone cannot see it and the rewritten files would silently overlap the patch. + if (writeDataEvolution) { + copyOnWriteScan.get.dataSplits.headOption.foreach( + split => + batchWriteBuilder + .asInstanceOf[BatchWriteBuilderImpl] + .rowIdCheckConflict(split.snapshotId())) + } val batchTableCommit = batchWriteBuilder.newCommit() batchTableCommit.withMetricRegistry(metricRegistry) val addCommitMessage = WriteTaskResult.merge(messages) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataEvolutionDataWriterBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataEvolutionDataWriterBase.scala new file mode 100644 index 000000000000..72616a5b13c1 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataEvolutionDataWriterBase.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.write + +import org.apache.paimon.catalog.CatalogContext +import org.apache.paimon.data.{BinaryRow, InternalRow => PaimonInternalRow} +import org.apache.paimon.io.{CompactIncrement, DataIncrement} +import org.apache.paimon.operation.AbstractFileStoreWrite +import org.apache.paimon.schema.TableSchema +import org.apache.paimon.spark.SparkInternalRowWrapper +import org.apache.paimon.spark.schema.PaimonMetadataColumn +import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl, RowPartitionKeyExtractor, TableWriteImpl} +import org.apache.paimon.utils.RecordWriter + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow + +import java.util.Collections + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +/** + * V2 [[DataWrite]] for copy-on-write UPDATE on data-evolution tables. + * + * Data-evolution rows derive `_ROW_ID` from the file's `firstRowId` plus the row position, so + * rewritten files must hold contiguous row-id runs and carry a `firstRowId` themselves instead of + * writing physical row-tracking columns (files with a physical `_ROW_ID` column are not assigned a + * `firstRowId` at commit and cannot be grouped by the data-evolution read path). The surviving rows + * arrive range-distributed and sorted by `_ROW_ID` (see + * [[PaimonWriteRequirement.dataEvolutionCopyOnWrite]]); this writer starts a new file run whenever + * the row id is not consecutive or the partition changes, and assigns each produced file the run's + * row id offset, mirroring `DataEvolutionTableDataWrite.PerFileWriter` and the commit skip rule in + * `RowTrackingCommitUtils` (files with a non-null `firstRowId` keep it). Sequence numbers are not + * preserved: the new files keep `minSequenceNumber == 0` and the commit stamps them with the new + * snapshot id, the same as any data-evolution rewrite. + */ +abstract class PaimonV2DataEvolutionDataWriterBase( + writeBuilder: BatchWriteBuilder, + tableSchema: TableSchema, + writeSchema: org.apache.spark.sql.types.StructType, + dataSchema: org.apache.spark.sql.types.StructType, + metadataSchema: org.apache.spark.sql.types.StructType, + catalogContext: CatalogContext) + extends InnerTableV2DataWrite + with Logging { + + private val rowIdIndex = metadataSchema.fieldIndex(PaimonMetadataColumn.ROW_ID_COLUMN) + + private val partitionExtractor = new RowPartitionKeyExtractor(tableSchema) + + private val reusableWrapper = + new SparkInternalRowWrapper(writeSchema, writeSchema.fields.length, dataSchema, catalogContext) + + private var currentRun: RunWriter = _ + private val commitMessages = ListBuffer[CommitMessage]() + + protected def writeWithMetadata(metadata: InternalRow, data: InternalRow): Unit = { + assert( + metadata != null && !metadata.isNullAt(rowIdIndex), + "Data-evolution copy-on-write requires a non-null _ROW_ID for every written row.") + val rowId = metadata.getLong(rowIdIndex) + val paimonRow = reusableWrapper.replace(data) + val partition = partitionExtractor.partition(paimonRow) + if (currentRun == null || !currentRun.accepts(rowId, partition)) { + finishCurrentRun() + currentRun = new RunWriter(rowId, partition.copy()) + } + currentRun.write(paimonRow) + } + + override def write(record: InternalRow): Unit = { + throw new UnsupportedOperationException( + "Data-evolution V2 copy-on-write only rewrites existing rows; " + + "rows without row-tracking metadata are not supported.") + } + + override def commitImpl(): Seq[CommitMessage] = { + finishCurrentRun() + commitMessages.toSeq + } + + override def abort(): Unit = close() + + override def close(): Unit = { + if (currentRun != null) { + currentRun.closeQuietly() + currentRun = null + } + } + + private def finishCurrentRun(): Unit = { + if (currentRun != null) { + commitMessages ++= currentRun.finish() + currentRun = null + } + } + + /** Writes one contiguous row-id run; may roll into several files, all stay range-adjacent. */ + private class RunWriter(startRowId: Long, partition: BinaryRow) { + + private var nextRowId = startRowId + + // The run writer must only append the rewritten rows and never restore the bucket's + // existing files. The bucket-unaware append write already enforces this + // (AppendFileStoreWrite pins ignorePreviousFiles=true and uses NoopCompactManager), but + // declare it explicitly rather than depend on which FileStoreWrite subclass we got. + private val recordWriter: RecordWriter[PaimonInternalRow] = writeBuilder + .newWrite() + .asInstanceOf[TableWriteImpl[PaimonInternalRow]] + .withIgnorePreviousFiles(true) + .getWrite + .asInstanceOf[AbstractFileStoreWrite[PaimonInternalRow]] + .createWriter(partition, 0) + + def accepts(rowId: Long, rowPartition: BinaryRow): Boolean = { + rowId == nextRowId && partition == rowPartition + } + + def write(row: PaimonInternalRow): Unit = { + recordWriter.write(row) + nextRowId += 1 + } + + def finish(): Seq[CommitMessage] = { + try { + val result = recordWriter.prepareCommit(false) + // The run writer is a fresh append writer over no existing files, so it must only + // produce new files; compaction output or deletions would bypass the firstRowId + // re-assignment below and must not be dropped silently. + assert( + result.compactIncrement().isEmpty && + result.newFilesIncrement().deletedFiles().isEmpty && + result.newFilesIncrement().changelogFiles().isEmpty, + s"Data-evolution copy-on-write run writer produced increments other than new files: " + + s"${result.compactIncrement()}, ${result.newFilesIncrement()}, this is a bug." + ) + val newFiles = result.newFilesIncrement().newFiles().asScala + var fileFirstRowId = startRowId + // `firstRowId` is assigned by walking `newFiles` in their returned order and accumulating + // `rowCount`, which relies on the append writer returning files in write order (rows arrive + // sorted by `_ROW_ID`). A zero-row file would hand the same `firstRowId` to the next file + // and corrupt the run, so reject it explicitly rather than let the total-count assert pass. + val assigned = newFiles.map { + file => + assert( + file.rowCount() > 0, + s"Data-evolution copy-on-write run [$startRowId, $nextRowId) produced an empty file " + + s"${file.fileName()}, this is a bug.") + val assignedFile = file.assignFirstRowId(fileFirstRowId) + fileFirstRowId += file.rowCount() + assignedFile + } + assert( + fileFirstRowId == nextRowId, + s"Data-evolution copy-on-write run [$startRowId, $nextRowId) wrote " + + s"${fileFirstRowId - startRowId} rows into files, this is a bug." + ) + if (assigned.isEmpty) { + Nil + } else { + Seq( + new CommitMessageImpl( + partition, + 0, + null, + new DataIncrement(assigned.asJava, Collections.emptyList(), Collections.emptyList()), + CompactIncrement.emptyIncrement())) + } + } finally { + recordWriter.close() + } + } + + def closeQuietly(): Unit = { + try { + recordWriter.close() + } catch { + case e: Exception => logWarning("Failed to close data-evolution run writer", e) + } + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala index 2ae1dd53a367..293f6716c4a0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala @@ -47,7 +47,13 @@ class PaimonV2Write( with SchemaEvolutionHelper with Logging { - private val writeRequirement = PaimonWriteRequirement(table) + private val writeRequirement = { + if (copyOnWriteScan.isDefined && table.coreOptions().dataEvolutionEnabled()) { + PaimonWriteRequirement.dataEvolutionCopyOnWrite() + } else { + PaimonWriteRequirement(table) + } + } override def requiredDistribution(): Distribution = { val distribution = writeRequirement.distribution diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala index 1f95c1914612..3bb7708762d5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala @@ -20,11 +20,12 @@ package org.apache.paimon.spark.write import org.apache.paimon.CoreOptions.PartitionSinkStrategy import org.apache.paimon.spark.commands.BucketExpression.quote +import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.table.BucketMode._ import org.apache.paimon.table.FileStoreTable import org.apache.spark.sql.connector.distributions.{ClusteredDistribution, Distribution, Distributions} -import org.apache.spark.sql.connector.expressions.{Expression, Expressions, SortOrder} +import org.apache.spark.sql.connector.expressions.{Expression, Expressions, SortDirection, SortOrder} import scala.collection.JavaConverters._ @@ -70,4 +71,19 @@ object PaimonWriteRequirement { PaimonWriteRequirement(distribution, EMPTY_ORDERING) } } + + /** + * Requirement for copy-on-write UPDATE on data-evolution tables: range-distribute and sort by + * `_ROW_ID` so each task receives contiguous row-id segments and the writer can emit files whose + * row ids derive from `firstRowId` plus position. `_ROW_ID` is preserved for both carryover and + * updated rows, which also keeps rows of one partition together (row id ranges never span + * partitions). + */ + def dataEvolutionCopyOnWrite(): PaimonWriteRequirement = { + val ordering: Array[SortOrder] = + Array( + Expressions + .sort(Expressions.column(PaimonMetadataColumn.ROW_ID_COLUMN), SortDirection.ASCENDING)) + PaimonWriteRequirement(Distributions.ordered(ordering), ordering) + } } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala index 4fdf2bafc02f..d6f2bb650bd6 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala @@ -26,13 +26,19 @@ import org.apache.spark.sql.execution.datasources.v2.ExtractV2Table /** * Shared scope predicates for the Spark 4.1 Resolution-batch row-level rewrite rules - * ([[Spark41UpdateTableRewrite]] for UPDATE + metadata-only DELETE reverse-optimization, - * [[Spark41MergeIntoRewrite]] for MERGE). + * ([[Spark41UpdateTableRewrite]] for UPDATE, [[Spark41DeleteMetadataRestore]] for metadata-only + * DELETE reverse-optimization, [[Spark41MergeIntoRewrite]] for MERGE). * * These rules only intercept operations against Paimon tables that are valid for Spark's V2 - * copy-on-write rewrite: no primary key, data evolution, deletion vectors, or fixed-length - * `CHAR(n)` columns. Row-tracking-only tables are included; tables that violate any of these - * constraints go through Paimon's postHoc V1 commands or Spark's built-in analysis path. + * copy-on-write rewrite: no primary key, deletion vectors, or fixed-length `CHAR(n)` columns. + * Row-tracking-only tables are included. Data-evolution tables (without BLOB or vector-store files) + * are included for UPDATE only: DELETE remains unsupported and MERGE keeps the V1 + * `MergeIntoPaimonDataEvolutionTable` command, which writes partial-column patch files instead of + * rewriting whole rows. Tables that violate any of these constraints go through Paimon's postHoc V1 + * commands or Spark's built-in analysis path. + * + * The conditions mirror `SparkTable.supportsV2RowLevelOps`; the data-evolution gate is shared + * through `SparkTable.supportsDataEvolutionCopyOnWrite`, keep the remaining conditions in sync. * * Kept as a mix-in trait so the two rewrite objects stay single-responsibility (one rule per Spark * row-level command, mirroring Spark's own `RewriteUpdateTable` / `RewriteMergeIntoTable` layout) @@ -41,6 +47,26 @@ import org.apache.spark.sql.execution.datasources.v2.ExtractV2Table trait PureAppendOnlyScope { protected def targetsV2CopyOnWriteTable(aliasedTable: LogicalPlan): Boolean = { + targetsPaimonFileStoreTable(aliasedTable) { + case (sparkTable, fs) => + fs.primaryKeys().isEmpty && + SparkTable.supportsDataEvolutionCopyOnWrite(fs) && + !sparkTable.coreOptions.deletionVectorsEnabled() && + !SparkTypeUtils.containsCharType(fs.rowType()) + } + } + + protected def targetsV2CopyOnWriteMergeTable(aliasedTable: LogicalPlan): Boolean = { + targetsPaimonFileStoreTable(aliasedTable) { + case (sparkTable, fs) => + fs.primaryKeys().isEmpty && + !sparkTable.coreOptions.dataEvolutionEnabled() && + !sparkTable.coreOptions.deletionVectorsEnabled() && + !SparkTypeUtils.containsCharType(fs.rowType()) + } + } + + protected def targetsV2CopyOnWriteDeleteTable(aliasedTable: LogicalPlan): Boolean = { targetsPaimonFileStoreTable(aliasedTable) { case (sparkTable, fs) => fs.primaryKeys().isEmpty && diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala index d21bc8098e49..a14592617bbe 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala @@ -78,7 +78,7 @@ object Spark41DeleteMetadataRestore extends RewriteRowLevelCommand with PureAppe case _ => false } writeIsDelete && (rd.originalTable match { - case r: DataSourceV2Relation if targetsV2CopyOnWriteTable(r) => + case r: DataSourceV2Relation if targetsV2CopyOnWriteDeleteTable(r) => r.table match { case spk: SparkTable => spk.getTable match { diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala index f3dd21f15f45..856dc7e68665 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala @@ -72,7 +72,7 @@ object Spark41MergeIntoRewrite plan.transformDown { case m: MergeIntoTable if m.resolved && m.rewritable && !m.needSchemaEvolution && - targetsV2CopyOnWriteTable(m.targetTable) => + targetsV2CopyOnWriteMergeTable(m.targetTable) => // Pure append-only tables skip postHoc `PaimonMergeInto`, so evolve schema here. val evolved = evolveSchemaIfPaimon(m) rewrite(alignAllMergeActions(evolved, evolved.targetTable.output)) diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala index 97edbdc780c3..ede84b5dd8d2 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.analysis.PaimonAssignmentUtils -import org.apache.spark.sql.catalyst.expressions.{Alias, EqualNullSafe, Expression, If, Literal, MetadataAttribute, Not, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, EqualNullSafe, Expression, If, Literal, MetadataAttribute, Not, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, Assignment, Filter, LogicalPlan, Project, ReplaceData, Union, UpdateTable} import org.apache.spark.sql.catalyst.util.RowDeltaUtils.WRITE_WITH_METADATA_OPERATION @@ -30,6 +31,8 @@ import org.apache.spark.sql.connector.write.RowLevelOperationTable import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import scala.collection.JavaConverters._ + /** * Spark 4.1-only Resolution-batch rule that rewrites UPDATE on pure append-only Paimon tables (see * [[PureAppendOnlyScope]]) into a V2 `ReplaceData` plan, mirroring Spark's built-in @@ -47,8 +50,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * We fire before `ResolveAssignments`, so `u.aligned` is `false`; the rule pre-aligns via * `PaimonAssignmentUtils.alignUpdateAssignments` before building the plan. * - * Row-tracking-only tables use the same V2 copy-on-write rewrite. PK / DE / DV tables go through - * the postHoc V1 rule because they do not expose `SupportsRowLevelOperations`. DELETE is handled by + * Row-tracking-only and data-evolution tables use the same V2 copy-on-write rewrite for UPDATE (the + * latter additionally rejects partition-column updates). PK / DV tables go through the postHoc V1 + * rule because they do not expose `SupportsRowLevelOperations`. DELETE is handled by * [[Spark41DeleteMetadataRestore]]; MERGE by [[Spark41MergeIntoRewrite]]. */ object Spark41UpdateTableRewrite extends RewriteRowLevelCommand with PureAppendOnlyScope { @@ -61,6 +65,7 @@ object Spark41UpdateTableRewrite extends RewriteRowLevelCommand with PureAppendO if u.resolved && u.rewritable && targetsV2CopyOnWriteTable(aliasedTable) => EliminateSubqueryAliases(aliasedTable) match { case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) => + checkNoDataEvolutionPartitionUpdate(tbl, assignments) val table = buildOperationTable(tbl, UPDATE, CaseInsensitiveStringMap.empty()) val updateCond = cond.getOrElse(TrueLiteral) // `ResolveAssignments` fires later in the batch, so `u.aligned` is still false. @@ -82,6 +87,30 @@ object Spark41UpdateTableRewrite extends RewriteRowLevelCommand with PureAppendO } } + /** + * Mirrors the postHoc `PaimonUpdateTable` guard: data-evolution rewritten files keep the original + * row ids, which are derived from the file's firstRowId per partition, so moving a row to another + * partition is not supported. + */ + private def checkNoDataEvolutionPartitionUpdate( + tbl: SupportsRowLevelOperations, + assignments: Seq[Assignment]): Unit = { + tbl match { + case sparkTable: SparkTable if sparkTable.coreOptions.dataEvolutionEnabled() => + val partitionKeys = sparkTable.getTable.partitionKeys().asScala + val updatesPartitionColumn = assignments.exists { + case Assignment(key: AttributeReference, value) if !key.fastEquals(value) => + partitionKeys.exists(partitionKey => conf.resolver(partitionKey, key.name)) + case _ => false + } + if (updatesPartitionColumn) { + throw new RuntimeException( + "Update to partition columns is not supported for data evolution tables.") + } + case _ => + } + } + // Mirrors Spark 4.1.1 `RewriteUpdateTable.{buildReplaceDataPlan, buildReplaceDataWithUnionPlan, // buildReplaceDataUpdateProjection}`. private def buildReplaceDataPlan(