From 21a6ebb00d9c1878ab0070ab286edabe510033ef Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Thu, 16 Jan 2025 19:46:41 -0700 Subject: [PATCH] Spark: Fix Puffin suffix for DVs --- .../spark/extensions/TestMergeOnReadDelete.java | 2 ++ .../iceberg/spark/extensions/TestMergeOnReadMerge.java | 2 ++ .../spark/extensions/TestMergeOnReadUpdate.java | 2 ++ .../java/org/apache/iceberg/spark/SparkWriteConf.java | 4 ++++ .../org/apache/iceberg/spark/TestSparkWriteConf.java | 10 ++++++++++ 5 files changed, 20 insertions(+) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java index 505b88711371..bf9c53f82dbd 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RowDelta; import org.apache.iceberg.RowLevelOperationMode; @@ -232,6 +233,7 @@ public void testDeleteWithDVAndHistoricalPositionDeletes() { deleteFiles.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList()); assertThat(dvs).hasSize(1); assertThat(dvs).allMatch(dv -> dv.recordCount() == 3); + assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN); } private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java index 7af128bcc171..cb2cf801e0c3 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -129,6 +130,7 @@ public void testMergeWithDVAndHistoricalPositionDeletes() { deleteFiles.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList()); assertThat(dvs).hasSize(1); assertThat(dvs).allMatch(dv -> dv.recordCount() == 3); + assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN); } private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) { diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java index 477a2e73256b..1bec21b9b68d 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; @@ -209,6 +210,7 @@ public void testUpdateWithDVAndHistoricalPositionDeletes() { deleteFiles.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList()); assertThat(dvs).hasSize(1); assertThat(dvs.get(0).recordCount()).isEqualTo(3); + assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN); } private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index f9fb904db394..711d787aebdf 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -214,6 +214,10 @@ private boolean fanoutWriterEnabled(boolean defaultValue) { } public FileFormat deleteFileFormat() { + if (useDVs()) { + return FileFormat.PUFFIN; + } + String valueAsString = confParser .stringConf() diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 42d697410377..356e5550c3e7 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateProperties; @@ -540,6 +541,15 @@ public void testDeleteFileWriteConf() { } } + @TestTemplate + public void testDVWriteConf() { + Table table = validationCatalog.loadTable(tableIdent); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + assertThat(writeConf.useDVs()).isTrue(); + assertThat(writeConf.deleteFileFormat()).isEqualTo(FileFormat.PUFFIN); + } + private void testWriteProperties(List> propertiesSuite) { withSQLConf( propertiesSuite.get(0),