From 4854f1a04df8501ab7144ebe5711a06bd0f8fe99 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 3 Jan 2026 15:28:21 -0500 Subject: [PATCH 1/2] fix: Use `DataFile.location()` for paths, prevent `NoSuchElementException` in `AppendFilesToTables.shouldSkip`, and add a test for append after delete. --- .../sdk/io/iceberg/AppendFilesToTables.java | 33 ++++-- .../sdk/io/iceberg/SerializableDataFile.java | 2 +- .../io/iceberg/AppendFilesToTablesTest.java | 103 ++++++++++++++++++ 3 files changed, 126 insertions(+), 12 deletions(-) create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index 1789932d69a7..db95c6703857 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -135,7 +135,8 @@ public void processElement( } // vast majority of the time, we will simply append data files. - // in the rare case we get a batch that contains multiple partition specs, we will group + // in the rare case we get a batch that contains multiple partition specs, we + // will group // data into manifest files and append. // note: either way, we must use a single commit operation for atomicity. if (containsMultiplePartitionSpecs(fileWriteResults)) { @@ -163,11 +164,14 @@ private void appendDataFiles(Table table, Iterable fileWriteRes update.commit(); } - // When a user updates their table partition spec during runtime, we can end up with - // a batch of files where some are written with the old spec and some are written with the new + // When a user updates their table partition spec during runtime, we can end up + // with + // a batch of files where some are written with the old spec and some are + // written with the new // spec. // A table commit is limited to a single partition spec. - // To handle this, we create a manifest file for each partition spec, and group data files + // To handle this, we create a manifest file for each partition spec, and group + // data files // accordingly. // Afterward, we append all manifests using a single commit operation. private void appendManifestFiles(Table table, Iterable fileWriteResults) @@ -211,14 +215,18 @@ private ManifestWriter createManifestWriter( return ManifestFiles.write(spec, io.newOutputFile(location)); } - // If the process call fails immediately after a successful commit, it gets retried with + // If the process call fails immediately after a successful commit, it gets + // retried with // the same data, possibly leading to data duplication. - // To mitigate, we skip the current batch of files if it matches the most recently committed + // To mitigate, we skip the current batch of files if it matches the most + // recently committed // batch. // - // TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the - // "last successful snapshot" might reflect commits from other sources. Ideally, we would make - // this stateful, but that is update incompatible. + // TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, + // where the + // "last successful snapshot" might reflect commits from other sources. Ideally, + // we would make + // this stateful, but that is update incompatible. // TODO(ahmedabu98): add load test pipelines with intentional periodic crashing private boolean shouldSkip(Table table, Iterable fileWriteResults) { if (table.currentSnapshot() == null) { @@ -231,8 +239,11 @@ private boolean shouldSkip(Table table, Iterable fileWriteResul // Check if the current batch is identical to the most recently committed batch. // Upstream GBK means we always get the same batch of files on retry, // so a single overlapping file means the whole batch is identical. - String sampleCommittedDataFilePath = - table.currentSnapshot().addedDataFiles(table.io()).iterator().next().path().toString(); + Iterable addedDataFiles = table.currentSnapshot().addedDataFiles(table.io()); + if (!addedDataFiles.iterator().hasNext()) { + return false; + } + String sampleCommittedDataFilePath = addedDataFiles.iterator().next().location().toString(); for (FileWriteResult result : fileWriteResults) { if (result.getSerializableDataFile().getPath().equals(sampleCommittedDataFilePath)) { return true; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index 5c994c3e5651..f54cef16c159 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -141,7 +141,7 @@ abstract static class Builder { static SerializableDataFile from(DataFile f, String partitionPath) { return SerializableDataFile.builder() - .setPath(f.path().toString()) + .setPath(f.location().toString()) .setFileFormat(f.format().toString()) .setRecordCount(f.recordCount()) .setFileSizeInBytes(f.fileSizeInBytes()) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java new file mode 100644 index 000000000000..ce1cdb05d8f7 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.hamcrest.Matchers; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AppendFilesToTablesTest implements Serializable { + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + + @Test + public void testAppendAfterDelete() throws Exception { + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + // 1. Create table and write some data + testPipeline + .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId)); + + testPipeline.run().waitUntilFinish(); + + // 2. Delete the data + Table table = warehouse.loadTable(tableId); + DeleteFiles delete = table.newDelete(); + // Delete all data files in the current snapshot + table.currentSnapshot().addedDataFiles(table.io()).forEach(delete::deleteFile); + delete.commit(); + + // 3. Write more data + testPipeline + .apply("More Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT2))) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .apply("Append More To Table", IcebergIO.writeRows(catalog).to(tableId)); + + testPipeline.run().waitUntilFinish(); + + // Verify data + // We mainly want to verify that no exception is thrown during the write. + // The exact content might depend on how delete operations interact with + // subsequent appends + // which is not the focus of this test. + table.refresh(); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + assertThat(writtenRecords, Matchers.hasItem(Matchers.anything())); + } +} From 1074c823845827d45cd3201b4e66f70a40f291da Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 10 Jan 2026 17:26:54 -0500 Subject: [PATCH 2/2] fixed the test --- .../io/iceberg/AppendFilesToTablesTest.java | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java index ce1cdb05d8f7..c4709256b4da 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTablesTest.java @@ -23,7 +23,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -49,8 +50,6 @@ public class AppendFilesToTablesTest implements Serializable { @Rule public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); - @Rule public transient TestPipeline testPipeline = TestPipeline.create(); - @Test public void testAppendAfterDelete() throws Exception { TableIdentifier tableId = @@ -68,13 +67,13 @@ public void testAppendAfterDelete() throws Exception { .setCatalogProperties(catalogProps) .build(); - // 1. Create table and write some data - testPipeline - .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) + // 1. Create table and write some data using first pipeline + Pipeline p1 = Pipeline.create(PipelineOptionsFactory.create()); + p1.apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId)); - testPipeline.run().waitUntilFinish(); + p1.run().waitUntilFinish(); // 2. Delete the data Table table = warehouse.loadTable(tableId); @@ -83,21 +82,17 @@ public void testAppendAfterDelete() throws Exception { table.currentSnapshot().addedDataFiles(table.io()).forEach(delete::deleteFile); delete.commit(); - // 3. Write more data - testPipeline - .apply("More Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT2))) + // 3. Write more data using a fresh second pipeline + Pipeline p2 = Pipeline.create(PipelineOptionsFactory.create()); + p2.apply("More Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT2))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .apply("Append More To Table", IcebergIO.writeRows(catalog).to(tableId)); - testPipeline.run().waitUntilFinish(); + p2.run().waitUntilFinish(); - // Verify data - // We mainly want to verify that no exception is thrown during the write. - // The exact content might depend on how delete operations interact with - // subsequent appends - // which is not the focus of this test. + // Verify data - after delete and append, only FILE1SNAPSHOT2 should be present table.refresh(); List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); - assertThat(writtenRecords, Matchers.hasItem(Matchers.anything())); + assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT2.toArray())); } }