Skip to content

Commit

Permalink
Spark 3.5: Support Statistics Files in RewriteTablePath (#11929)
Browse files Browse the repository at this point in the history
  • Loading branch information
dramaticlly authored Feb 8, 2025
1 parent 7a8db16 commit d935460
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 20 deletions.
19 changes: 17 additions & 2 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
Expand Down Expand Up @@ -127,8 +128,8 @@ public static TableMetadata replacePaths(
metadata.snapshotLog(),
metadataLogEntries,
metadata.refs(),
// TODO: update statistic file paths
metadata.statisticsFiles(),
updatePathInStatisticsFiles(metadata.statisticsFiles(), sourcePrefix, targetPrefix),
// TODO: update partition statistics file paths
metadata.partitionStatisticsFiles(),
metadata.changes(),
metadata.rowLineageEnabled(),
Expand Down Expand Up @@ -160,6 +161,20 @@ private static void updatePathInProperty(
}
}

private static List<StatisticsFile> updatePathInStatisticsFiles(
List<StatisticsFile> statisticsFiles, String sourcePrefix, String targetPrefix) {
return statisticsFiles.stream()
.map(
existing ->
new GenericStatisticsFile(
existing.snapshotId(),
newPath(existing.path(), sourcePrefix, targetPrefix),
existing.fileSizeInBytes(),
existing.fileFooterSizeInBytes(),
existing.blobMetadata()))
.collect(Collectors.toList());
}

private static List<TableMetadata.MetadataLogEntry> updatePathInMetadataLogs(
TableMetadata metadata, String sourcePrefix, String targetPrefix) {
List<TableMetadata.MetadataLogEntry> metadataLogEntries =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -272,8 +273,9 @@ private String rebuildMetadata() {
((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current();

Preconditions.checkArgument(
endMetadata.statisticsFiles() == null || endMetadata.statisticsFiles().isEmpty(),
"Statistic files are not supported yet.");
endMetadata.partitionStatisticsFiles() == null
|| endMetadata.partitionStatisticsFiles().isEmpty(),
"Partition statistics files are not supported yet.");

// rebuild version files
RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata);
Expand Down Expand Up @@ -341,7 +343,7 @@ private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot>
private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) {
RewriteResult<Snapshot> result = new RewriteResult<>();
result.toRewrite().addAll(endMetadata.snapshots());
result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName));
result.copyPlan().addAll(rewriteVersionFile(endMetadata, endVersionName));

List<MetadataLogEntry> versions = endMetadata.previousFiles();
for (int i = versions.size() - 1; i >= 0; i--) {
Expand All @@ -357,19 +359,50 @@ private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) {
new StaticTableOperations(versionFilePath, table.io()).current();

result.toRewrite().addAll(tableMetadata.snapshots());
result.copyPlan().add(rewriteVersionFile(tableMetadata, versionFilePath));
result.copyPlan().addAll(rewriteVersionFile(tableMetadata, versionFilePath));
}

return result;
}

private Pair<String, String> rewriteVersionFile(TableMetadata metadata, String versionFilePath) {
private Set<Pair<String, String>> rewriteVersionFile(
TableMetadata metadata, String versionFilePath) {
Set<Pair<String, String>> result = Sets.newHashSet();
String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir);
TableMetadata newTableMetadata =
RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix);
TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath));
return Pair.of(
stagingPath, RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix));
result.add(
Pair.of(
stagingPath,
RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix)));

// include statistics files in copy plan
result.addAll(
statsFileCopyPlan(metadata.statisticsFiles(), newTableMetadata.statisticsFiles()));
return result;
}

private Set<Pair<String, String>> statsFileCopyPlan(
List<StatisticsFile> beforeStats, List<StatisticsFile> afterStats) {
Set<Pair<String, String>> result = Sets.newHashSet();
if (beforeStats.isEmpty()) {
return result;
}

Preconditions.checkArgument(
beforeStats.size() == afterStats.size(),
"Before and after path rewrite, statistic files count should be same");
for (int i = 0; i < beforeStats.size(); i++) {
StatisticsFile before = beforeStats.get(i);
StatisticsFile after = afterStats.get(i);
Preconditions.checkArgument(
before.fileSizeInBytes() == after.fileSizeInBytes(),
"Before and after path rewrite, statistic file size should be same");
result.add(
Pair.of(RewriteTablePathUtil.stagingPath(before.path(), stagingDir), after.path()));
}
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
Expand All @@ -59,7 +59,6 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -868,25 +867,27 @@ public void testInvalidArgs() {
}

@Test
public void testStatisticFile() throws IOException {
public void testPartitionStatisticFile() throws IOException {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
properties.put("format-version", "2");
String tableName = "v2tblwithstats";
String tableName = "v2tblwithPartStats";
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0);

TableMetadata metadata = currentMetadata(sourceTable);
TableMetadata withStatistics =
TableMetadata withPartStatistics =
TableMetadata.buildFrom(metadata)
.setStatistics(
43,
new GenericStatisticsFile(
43, "/some/path/to/stats/file", 128, 27, ImmutableList.of()))
.setPartitionStatistics(
ImmutableGenericPartitionStatisticsFile.builder()
.snapshotId(11L)
.path("/some/partition/stats/file.parquet")
.fileSizeInBytes(42L)
.build())
.build();

OutputFile file = sourceTable.io().newOutputFile(metadata.metadataFileLocation());
TableMetadataParser.overwrite(withStatistics, file);
TableMetadataParser.overwrite(withPartStatistics, file);

assertThatThrownBy(
() ->
Expand All @@ -895,7 +896,36 @@ public void testStatisticFile() throws IOException {
.rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
.execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Statistic files are not supported yet");
.hasMessageContaining("Partition statistics files are not supported yet");
}

@Test
public void testTableWithManyStatisticFiles() throws IOException {
String sourceTableLocation = newTableLocation();
Map<String, String> properties = Maps.newHashMap();
properties.put("format-version", "2");
String tableName = "v2tblwithmanystats";
Table sourceTable =
createMetastoreTable(sourceTableLocation, properties, "default", tableName, 0);

int iterations = 10;
for (int i = 0; i < iterations; i++) {
sql("insert into hive.default.%s values (%s, 'AAAAAAAAAA', 'AAAA')", tableName, i);
sourceTable.refresh();
actions().computeTableStats(sourceTable).execute();
}

sourceTable.refresh();
assertThat(sourceTable.statisticsFiles().size()).isEqualTo(iterations);

RewriteTablePath.Result result =
actions()
.rewriteTablePath(sourceTable)
.rewriteLocationPrefix(sourceTableLocation, targetTableLocation())
.execute();

checkFileNum(
iterations * 2 + 1, iterations, iterations, iterations, iterations * 6 + 1, result);
}

@Test
Expand Down Expand Up @@ -1063,6 +1093,16 @@ protected void checkFileNum(
int manifestFileCount,
int totalCount,
RewriteTablePath.Result result) {
checkFileNum(versionFileCount, manifestListCount, manifestFileCount, 0, totalCount, result);
}

protected void checkFileNum(
int versionFileCount,
int manifestListCount,
int manifestFileCount,
int statisticsFileCount,
int totalCount,
RewriteTablePath.Result result) {
List<String> filesToMove =
spark
.read()
Expand All @@ -1083,6 +1123,9 @@ protected void checkFileNum(
assertThat(filesToMove.stream().filter(isManifest).count())
.as("Wrong rebuilt Manifest file file count")
.isEqualTo(manifestFileCount);
assertThat(filesToMove.stream().filter(f -> f.endsWith(".stats")).count())
.withFailMessage("Wrong rebuilt Statistic file count")
.isEqualTo(statisticsFileCount);
assertThat(filesToMove.size()).as("Wrong total file count").isEqualTo(totalCount);
}

Expand Down

0 comments on commit d935460

Please sign in to comment.