Skip to content

Commit

Permalink
Spark: Fix Puffin suffix for DVs
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Jan 17, 2025
1 parent 3247964 commit e13f67d
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
Expand Down Expand Up @@ -232,6 +233,7 @@ public void testDeleteWithDVAndHistoricalPositionDeletes() {
deleteFiles.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList());
assertThat(dvs).hasSize(1);
assertThat(dvs).allMatch(dv -> dv.recordCount() == 3);
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN);
}

private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -129,6 +130,7 @@ public void testMergeWithDVAndHistoricalPositionDeletes() {
deleteFiles.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList());
assertThat(dvs).hasSize(1);
assertThat(dvs).allMatch(dv -> dv.recordCount() == 3);
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN);
}

private void checkMergeDeleteGranularity(DeleteGranularity deleteGranularity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -209,6 +210,7 @@ public void testUpdateWithDVAndHistoricalPositionDeletes() {
deleteFiles.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList());
assertThat(dvs).hasSize(1);
assertThat(dvs.get(0).recordCount()).isEqualTo(3);
assertThat(dvs).allMatch(dv -> FileFormat.fromFileName(dv.location()) == FileFormat.PUFFIN);
}

private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@

import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -214,6 +214,10 @@ private boolean fanoutWriterEnabled(boolean defaultValue) {
}

public FileFormat deleteFileFormat() {
if (useDVs()) {
return FileFormat.PUFFIN;
}

String valueAsString =
confParser
.stringConf()
Expand Down Expand Up @@ -723,7 +727,6 @@ public DeleteGranularity deleteGranularity() {
}

public boolean useDVs() {
TableOperations ops = ((HasTableOperations) table).operations();
return ops.current().formatVersion() >= 3;
return !(table instanceof BaseMetadataTable) && TableUtil.formatVersion(table) >= 3;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateProperties;
Expand Down Expand Up @@ -540,6 +541,15 @@ public void testDeleteFileWriteConf() {
}
}

@TestTemplate
public void testDVWriteConf() {
Table table = validationCatalog.loadTable(tableIdent);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit();
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
assertThat(writeConf.useDVs()).isTrue();
assertThat(writeConf.deleteFileFormat()).isEqualTo(FileFormat.PUFFIN);
}

private void testWriteProperties(List<Map<String, String>> propertiesSuite) {
withSQLConf(
propertiesSuite.get(0),
Expand Down

0 comments on commit e13f67d

Please sign in to comment.