Skip to content

Commit

Permalink
Core: Fix RewriteTablePath Incremental Replication (#12172)
Browse files Browse the repository at this point in the history
  • Loading branch information
barronfuentes authored Feb 8, 2025
1 parent a89f1f9 commit 7a8db16
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
15 changes: 6 additions & 9 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
Expand Down Expand Up @@ -225,11 +224,7 @@ public static RewriteResult<ManifestFile> rewriteManifestList(
OutputFile outputFile = io.newOutputFile(outputPath);

List<ManifestFile> manifestFiles = manifestFilesInSnapshot(io, snapshot);
List<ManifestFile> manifestFilesToRewrite =
manifestFiles.stream()
.filter(mf -> manifestsToRewrite.contains(mf.path()))
.collect(Collectors.toList());
manifestFilesToRewrite.forEach(
manifestFiles.forEach(
mf ->
Preconditions.checkArgument(
mf.path().startsWith(sourcePrefix),
Expand All @@ -245,13 +240,15 @@ public static RewriteResult<ManifestFile> rewriteManifestList(
snapshot.parentId(),
snapshot.sequenceNumber())) {

for (ManifestFile file : manifestFilesToRewrite) {
for (ManifestFile file : manifestFiles) {
ManifestFile newFile = file.copy();
((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, targetPrefix));
writer.add(newFile);

result.toRewrite().add(file);
result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), newFile.path()));
if (manifestsToRewrite.contains(file.path())) {
result.toRewrite().add(file);
result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), newFile.path()));
}
}
return result;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,52 @@ public void testStartVersion() throws Exception {
.isEqualTo(0);
}

@Test
public void testIncrementalRewrite() throws Exception {
String location = newTableLocation();
Table sourceTable =
TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), location);
List<ThreeColumnRecord> recordsA =
Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
Dataset<Row> dfA = spark.createDataFrame(recordsA, ThreeColumnRecord.class).coalesce(1);

// Write first increment to source table
dfA.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location);
assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(1);

// Replicate first increment to target table
RewriteTablePath.Result result =
actions()
.rewriteTablePath(sourceTable)
.rewriteLocationPrefix(sourceTable.location(), targetTableLocation())
.execute();
copyTableFiles(result);
assertThat(spark.read().format("iceberg").load(targetTableLocation()).count()).isEqualTo(1);

// Write second increment to source table
List<ThreeColumnRecord> recordsB =
Lists.newArrayList(new ThreeColumnRecord(2, "BBBBBBBBB", "BBB"));
Dataset<Row> dfB = spark.createDataFrame(recordsB, ThreeColumnRecord.class).coalesce(1);
dfB.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location);
assertThat(spark.read().format("iceberg").load(location).count()).isEqualTo(2);

// Replicate second increment to target table
sourceTable.refresh();
Table targetTable = TABLES.load(targetTableLocation());
String targetTableMetadata = currentMetadata(targetTable).metadataFileLocation();
String startVersion = fileName(targetTableMetadata);
RewriteTablePath.Result incrementalRewriteResult =
actions()
.rewriteTablePath(sourceTable)
.rewriteLocationPrefix(sourceTable.location(), targetTableLocation())
.startVersion(startVersion)
.execute();
copyTableFiles(incrementalRewriteResult);
List<Object[]> actual = rowsSorted(targetTableLocation(), "c1");
List<Object[]> expected = rowsSorted(location, "c1");
assertEquals("Rows should match after copy", expected, actual);
}

@Test
public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path location2)
throws Exception {
Expand Down Expand Up @@ -1144,6 +1190,10 @@ private List<Object[]> rows(String location) {
return rowsToJava(spark.read().format("iceberg").load(location).collectAsList());
}

private List<Object[]> rowsSorted(String location, String sortCol) {
return rowsToJava(spark.read().format("iceberg").load(location).sort(sortCol).collectAsList());
}

private PositionDelete<GenericRecord> positionDelete(
Schema tableSchema, CharSequence path, Long position, Object... values) {
PositionDelete<GenericRecord> posDelete = PositionDelete.create();
Expand Down

0 comments on commit 7a8db16

Please sign in to comment.