diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index 1789932d69a7..db95c6703857 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -135,7 +135,8 @@ public void processElement( } // vast majority of the time, we will simply append data files. - // in the rare case we get a batch that contains multiple partition specs, we will group + // in the rare case we get a batch that contains multiple partition specs, we + // will group // data into manifest files and append. // note: either way, we must use a single commit operation for atomicity. if (containsMultiplePartitionSpecs(fileWriteResults)) { @@ -163,11 +164,14 @@ private void appendDataFiles(Table table, Iterable fileWriteRes update.commit(); } - // When a user updates their table partition spec during runtime, we can end up with - // a batch of files where some are written with the old spec and some are written with the new + // When a user updates their table partition spec during runtime, we can end up + // with + // a batch of files where some are written with the old spec and some are + // written with the new // spec. // A table commit is limited to a single partition spec. - // To handle this, we create a manifest file for each partition spec, and group data files + // To handle this, we create a manifest file for each partition spec, and group + // data files // accordingly. // Afterward, we append all manifests using a single commit operation. private void appendManifestFiles(Table table, Iterable fileWriteResults) @@ -211,14 +215,18 @@ private ManifestWriter createManifestWriter( return ManifestFiles.write(spec, io.newOutputFile(location)); } - // If the process call fails immediately after a successful commit, it gets retried with + // If the process call fails immediately after a successful commit, it gets + // retried with // the same data, possibly leading to data duplication. - // To mitigate, we skip the current batch of files if it matches the most recently committed + // To mitigate, we skip the current batch of files if it matches the most + // recently committed // batch. // - // TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the - // "last successful snapshot" might reflect commits from other sources. Ideally, we would make - // this stateful, but that is update incompatible. + // TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, + // where the + // "last successful snapshot" might reflect commits from other sources. Ideally, + // we would make + // this stateful, but that is update incompatible. // TODO(ahmedabu98): add load test pipelines with intentional periodic crashing private boolean shouldSkip(Table table, Iterable fileWriteResults) { if (table.currentSnapshot() == null) { @@ -231,8 +239,11 @@ private boolean shouldSkip(Table table, Iterable fileWriteResul // Check if the current batch is identical to the most recently committed batch. // Upstream GBK means we always get the same batch of files on retry, // so a single overlapping file means the whole batch is identical. - String sampleCommittedDataFilePath = - table.currentSnapshot().addedDataFiles(table.io()).iterator().next().path().toString(); + Iterable addedDataFiles = table.currentSnapshot().addedDataFiles(table.io()); + if (!addedDataFiles.iterator().hasNext()) { + return false; + } + String sampleCommittedDataFilePath = addedDataFiles.iterator().next().location().toString(); for (FileWriteResult result : fileWriteResults) { if (result.getSerializableDataFile().getPath().equals(sampleCommittedDataFilePath)) { return true; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index 5c994c3e5651..f54cef16c159 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -141,7 +141,7 @@ abstract static class Builder { static SerializableDataFile from(DataFile f, String partitionPath) { return SerializableDataFile.builder() - .setPath(f.path().toString()) + .setPath(f.location().toString()) .setFileFormat(f.format().toString()) .setRecordCount(f.recordCount()) .setFileSizeInBytes(f.fileSizeInBytes()) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java new file mode 100644 index 000000000000..c4709256b4da --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.hamcrest.Matchers; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AppendFilesToTablesTest implements Serializable { + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Test + public void testAppendAfterDelete() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + // 1. Create table and write some data using first pipeline + Pipeline p1 = Pipeline.create(PipelineOptionsFactory.create()); + p1.apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId)); + + p1.run().waitUntilFinish(); + + // 2. Delete the data + Table table = warehouse.loadTable(tableId); + DeleteFiles delete = table.newDelete(); + // Delete all data files in the current snapshot + table.currentSnapshot().addedDataFiles(table.io()).forEach(delete::deleteFile); + delete.commit(); + + // 3. Write more data using a fresh second pipeline + Pipeline p2 = Pipeline.create(PipelineOptionsFactory.create()); + p2.apply("More Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT2))) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .apply("Append More To Table", IcebergIO.writeRows(catalog).to(tableId)); + + p2.run().waitUntilFinish(); + + // Verify data - after delete and append, only FILE1SNAPSHOT2 should be present + table.refresh(); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT2.toArray())); + } +}