Skip to content

Commit

Permalink
Spark: Make delete file ratio configurable (#12148)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelDeSteven authored Feb 3, 2025
1 parent f6faa58 commit 507e2a9
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,26 @@ public abstract class SizeBasedDataRewriter extends SizeBasedFileRewriter<FileSc
public static final String DELETE_FILE_THRESHOLD = "delete-file-threshold";

public static final int DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE;
private static final double DELETE_RATIO_THRESHOLD = 0.3;

/**
* The minimum deletion ratio that needs to be associated with a data file for it to be considered
* for rewriting. If the deletion ratio of a data file is greater than or equal to this value, it
* will be rewritten regardless of its file size determined by {@link #MIN_FILE_SIZE_BYTES} and
* {@link #MAX_FILE_SIZE_BYTES}. If a file group contains a file that satisfies this condition,
* the file group will be rewritten regardless of the number of files in the file group determined
* by {@link #MIN_INPUT_FILES}.
*
* <p>Defaults to 0.3, which means that if the deletion ratio of a file reaches or exceeds 30%, it
* may trigger the rewriting operation.
*/
public static final String DELETE_RATIO_THRESHOLD = "delete-ratio-threshold";

public static final double DELETE_RATIO_THRESHOLD_DEFAULT = 0.3;

private int deleteFileThreshold;

private double deleteRatioThreshold;

protected SizeBasedDataRewriter(Table table) {
super(table);
}
Expand All @@ -60,13 +76,26 @@ public Set<String> validOptions() {
return ImmutableSet.<String>builder()
.addAll(super.validOptions())
.add(DELETE_FILE_THRESHOLD)
.add(DELETE_RATIO_THRESHOLD)
.build();
}

@Override
public void init(Map<String, String> options) {
super.init(options);
this.deleteFileThreshold = deleteFileThreshold(options);
this.deleteRatioThreshold = deleteRatioThreshold(options);
}

private double deleteRatioThreshold(Map<String, String> options) {
double value =
PropertyUtil.propertyAsDouble(
options, DELETE_RATIO_THRESHOLD, DELETE_RATIO_THRESHOLD_DEFAULT);
Preconditions.checkArgument(
value > 0, "'%s' is set to %s but must be > 0", DELETE_RATIO_THRESHOLD, value);
Preconditions.checkArgument(
value <= 1, "'%s' is set to %s but must be <= 1", DELETE_RATIO_THRESHOLD, value);
return value;
}

@Override
Expand Down Expand Up @@ -116,7 +145,7 @@ private boolean tooHighDeleteRatio(FileScanTask task) {

double deletedRecords = (double) Math.min(knownDeletedRecordCount, task.file().recordCount());
double deleteRatio = deletedRecords / task.file().recordCount();
return deleteRatio >= DELETE_RATIO_THRESHOLD;
return deleteRatio >= deleteRatioThreshold;
}

@Override
Expand Down
1 change: 1 addition & 0 deletions docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile
| `rewrite-all` | false | Force rewriting of all provided files overriding other options |
| `max-file-group-size-bytes` | 107374182400 (100GB) | Largest amount of data that should be rewritten in a single file group. The entire rewrite operation is broken down into pieces based on partitioning and within partitions based on size into file-groups. This helps with breaking down the rewriting of very large partitions which may not be rewritable otherwise due to the resource constraints of the cluster. |
| `delete-file-threshold` | 2147483647 | Minimum number of deletes that needs to be associated with a data file for it to be considered for rewriting |
| `delete-ratio-threshold` | 0.3 | Minimum deletion ratio that needs to be associated with a data file for it to be considered for rewriting |
| `output-spec-id` | current partition spec id | Identifier of the output partition spec. Data will be reorganized during the rewrite to align with the output partitioning. |
| `remove-dangling-deletes` | false | Remove dangling position and equality deletes after rewriting. A delete file is considered dangling if it does not apply to any live data files. Enabling this will generate an additional commit for the removal. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public void testBinPackDataValidOptions() {
SparkBinPackDataRewriter.MIN_INPUT_FILES,
SparkBinPackDataRewriter.REWRITE_ALL,
SparkBinPackDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
SparkBinPackDataRewriter.DELETE_FILE_THRESHOLD),
SparkBinPackDataRewriter.DELETE_FILE_THRESHOLD,
SparkBinPackDataRewriter.DELETE_RATIO_THRESHOLD),
rewriter.validOptions());
}

Expand All @@ -265,6 +266,7 @@ public void testSortDataValidOptions() {
SparkSortDataRewriter.REWRITE_ALL,
SparkSortDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
SparkSortDataRewriter.DELETE_FILE_THRESHOLD,
SparkSortDataRewriter.DELETE_RATIO_THRESHOLD,
SparkSortDataRewriter.COMPRESSION_FACTOR),
rewriter.validOptions());
}
Expand All @@ -285,6 +287,7 @@ public void testZOrderDataValidOptions() {
SparkZOrderDataRewriter.REWRITE_ALL,
SparkZOrderDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
SparkZOrderDataRewriter.DELETE_FILE_THRESHOLD,
SparkZOrderDataRewriter.DELETE_RATIO_THRESHOLD,
SparkZOrderDataRewriter.COMPRESSION_FACTOR,
SparkZOrderDataRewriter.MAX_OUTPUT_SIZE,
SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION),
Expand All @@ -301,7 +304,20 @@ public void testInvalidValuesForBinPackDataOptions() {
Map<String, String> invalidDeleteThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");

Map<String, String> negativeDeleteRatioThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_RATIO_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(negativeDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to -1.0 but must be > 0");

Map<String, String> invalidDeleteRatioThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_RATIO_THRESHOLD, "127");
assertThatThrownBy(() -> rewriter.init(invalidDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to 127.0 but must be <= 1");
}

@Test
Expand All @@ -314,12 +330,27 @@ public void testInvalidValuesForSortDataOptions() {
Map<String, String> invalidDeleteThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");

Map<String, String> invalidCompressionFactorOptions =
ImmutableMap.of(SparkShufflingDataRewriter.COMPRESSION_FACTOR, "0");
assertThatThrownBy(() -> rewriter.init(invalidCompressionFactorOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'compression-factor' is set to 0.0 but must be > 0");

Map<String, String> negativeDeleteRatioThresholdOptions =
ImmutableMap.of(SparkShufflingDataRewriter.DELETE_RATIO_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(negativeDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to -1.0 but must be > 0");

Map<String, String> invalidDeleteRatioThresholdOptions =
ImmutableMap.of(SparkShufflingDataRewriter.DELETE_RATIO_THRESHOLD, "127");

assertThatThrownBy(() -> rewriter.init(invalidDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to 127.0 but must be <= 1");
}

@Test
Expand All @@ -333,24 +364,40 @@ public void testInvalidValuesForZOrderDataOptions() {
Map<String, String> invalidDeleteThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");

Map<String, String> invalidCompressionFactorOptions =
ImmutableMap.of(SparkShufflingDataRewriter.COMPRESSION_FACTOR, "0");
assertThatThrownBy(() -> rewriter.init(invalidCompressionFactorOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'compression-factor' is set to 0.0 but must be > 0");

Map<String, String> invalidMaxOutputOptions =
ImmutableMap.of(SparkZOrderDataRewriter.MAX_OUTPUT_SIZE, "0");
assertThatThrownBy(() -> rewriter.init(invalidMaxOutputOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot have the interleaved ZOrder value use less than 1 byte")
.hasMessageContaining("'max-output-size' was set to 0");

Map<String, String> invalidVarLengthContributionOptions =
ImmutableMap.of(SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION, "0");
assertThatThrownBy(() -> rewriter.init(invalidVarLengthContributionOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot use less than 1 byte for variable length types with ZOrder")
.hasMessageContaining("'var-length-contribution' was set to 0");

Map<String, String> negativeDeleteRatioThresholdOptions =
ImmutableMap.of(SparkZOrderDataRewriter.DELETE_RATIO_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(negativeDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to -1.0 but must be > 0");

Map<String, String> invalidDeleteRatioThresholdOptions =
ImmutableMap.of(SparkZOrderDataRewriter.DELETE_RATIO_THRESHOLD, "127");
assertThatThrownBy(() -> rewriter.init(invalidDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to 127.0 but must be <= 1");
}

private void validateSizeBasedRewriterOptions(SizeBasedFileRewriter<?, ?> rewriter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public void testBinPackDataValidOptions() {
SparkBinPackDataRewriter.MIN_INPUT_FILES,
SparkBinPackDataRewriter.REWRITE_ALL,
SparkBinPackDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
SparkBinPackDataRewriter.DELETE_FILE_THRESHOLD),
SparkBinPackDataRewriter.DELETE_FILE_THRESHOLD,
SparkBinPackDataRewriter.DELETE_RATIO_THRESHOLD),
rewriter.validOptions());
}

Expand All @@ -266,6 +267,7 @@ public void testSortDataValidOptions() {
SparkSortDataRewriter.REWRITE_ALL,
SparkSortDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
SparkSortDataRewriter.DELETE_FILE_THRESHOLD,
SparkSortDataRewriter.DELETE_RATIO_THRESHOLD,
SparkSortDataRewriter.COMPRESSION_FACTOR),
rewriter.validOptions());
}
Expand All @@ -287,6 +289,7 @@ public void testZOrderDataValidOptions() {
SparkZOrderDataRewriter.REWRITE_ALL,
SparkZOrderDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
SparkZOrderDataRewriter.DELETE_FILE_THRESHOLD,
SparkZOrderDataRewriter.DELETE_RATIO_THRESHOLD,
SparkZOrderDataRewriter.COMPRESSION_FACTOR,
SparkZOrderDataRewriter.MAX_OUTPUT_SIZE,
SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION),
Expand All @@ -303,7 +306,21 @@ public void testInvalidValuesForBinPackDataOptions() {
Map<String, String> invalidDeleteThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");

Map<String, String> negativeDeleteRatioThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_RATIO_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(negativeDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to -1.0 but must be > 0");

Map<String, String> invalidDeleteRatioThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_RATIO_THRESHOLD, "127");

assertThatThrownBy(() -> rewriter.init(invalidDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to 127.0 but must be <= 1");
}

@Test
Expand All @@ -316,12 +333,27 @@ public void testInvalidValuesForSortDataOptions() {
Map<String, String> invalidDeleteThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");

Map<String, String> invalidCompressionFactorOptions =
ImmutableMap.of(SparkShufflingDataRewriter.COMPRESSION_FACTOR, "0");
assertThatThrownBy(() -> rewriter.init(invalidCompressionFactorOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'compression-factor' is set to 0.0 but must be > 0");

Map<String, String> negativeDeleteRatioThresholdOptions =
ImmutableMap.of(SparkShufflingDataRewriter.DELETE_RATIO_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(negativeDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to -1.0 but must be > 0");

Map<String, String> invalidDeleteRatioThresholdOptions =
ImmutableMap.of(SparkShufflingDataRewriter.DELETE_RATIO_THRESHOLD, "127");

assertThatThrownBy(() -> rewriter.init(invalidDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to 127.0 but must be <= 1");
}

@Test
Expand All @@ -335,24 +367,41 @@ public void testInvalidValuesForZOrderDataOptions() {
Map<String, String> invalidDeleteThresholdOptions =
ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");

Map<String, String> invalidCompressionFactorOptions =
ImmutableMap.of(SparkShufflingDataRewriter.COMPRESSION_FACTOR, "0");
assertThatThrownBy(() -> rewriter.init(invalidCompressionFactorOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'compression-factor' is set to 0.0 but must be > 0");

Map<String, String> invalidMaxOutputOptions =
ImmutableMap.of(SparkZOrderDataRewriter.MAX_OUTPUT_SIZE, "0");
assertThatThrownBy(() -> rewriter.init(invalidMaxOutputOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot have the interleaved ZOrder value use less than 1 byte")
.hasMessageContaining("'max-output-size' was set to 0");

Map<String, String> invalidVarLengthContributionOptions =
ImmutableMap.of(SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION, "0");
assertThatThrownBy(() -> rewriter.init(invalidVarLengthContributionOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot use less than 1 byte for variable length types with ZOrder")
.hasMessageContaining("'var-length-contribution' was set to 0");

Map<String, String> negativeDeleteRatioThresholdOptions =
ImmutableMap.of(SparkZOrderDataRewriter.DELETE_RATIO_THRESHOLD, "-1");
assertThatThrownBy(() -> rewriter.init(negativeDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to -1.0 but must be > 0");

Map<String, String> invalidDeleteRatioThresholdOptions =
ImmutableMap.of(SparkZOrderDataRewriter.DELETE_RATIO_THRESHOLD, "127");

assertThatThrownBy(() -> rewriter.init(invalidDeleteRatioThresholdOptions))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("'delete-ratio-threshold' is set to 127.0 but must be <= 1");
}

private void validateSizeBasedRewriterOptions(SizeBasedFileRewriter<?, ?> rewriter) {
Expand Down
Loading

0 comments on commit 507e2a9

Please sign in to comment.