diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 4315e9bcc4d2..3bb4c299ce56 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -94,6 +94,15 @@ default void setRuntimeContext(Map options) {} */ PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException; + /** + * Whether {@link #newOutputStream(Path, boolean)} with {@code overwrite=false} can create this + * path without first listing or probing the parent directory, and can fail atomically if the + * object already exists. + */ + default boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException { + return false; + } + /** * Opens a TwoPhaseOutputStream at the indicated Path for transactional writing. * diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/PluginFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/PluginFileIO.java index d5b62a2944cf..ae05ec1a29c5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/PluginFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/PluginFileIO.java @@ -61,6 +61,11 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws return wrap(() -> fileIO(path).newOutputStream(path, overwrite)); } + @Override + public boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException { + return wrap(() -> fileIO(path).supportsAtomicCreateWithoutOverwrite(path)); + } + @Override public FileStatus getFileStatus(Path path) throws IOException { return wrap(() -> fileIO(path).getFileStatus(path)); diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java index 6a46d1a49008..4f310f612282 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java @@ -77,6 +77,11 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws return wrap(() -> fileIO(path).newOutputStream(path, overwrite)); } + @Override + public boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException { + return wrap(() -> fileIO(path).supportsAtomicCreateWithoutOverwrite(path)); + } + @Override public FileStatus getFileStatus(Path path) throws IOException { return wrap(() -> fileIO(path).getFileStatus(path)); diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java index e41d0f04cde4..2101b23e4c09 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java @@ -121,6 +121,11 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws return delegate.newOutputStream(path, overwrite); } + @Override + public boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException { + return delegate.supportsAtomicCreateWithoutOverwrite(path); + } + @Override public FileStatus getFileStatus(Path path) throws IOException { return delegate.getFileStatus(path); diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index f9f0d283bbba..2581aa2b540c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -114,6 +114,11 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws return fileIO().newOutputStream(path, overwrite); } + @Override + public boolean supportsAtomicCreateWithoutOverwrite(Path path) throws IOException { + return fileIO().supportsAtomicCreateWithoutOverwrite(path); + } + @Override public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java index a79aafb298d6..5ccec57d0d41 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java @@ -26,6 +26,7 @@ import org.apache.paimon.utils.SnapshotManager; import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; import java.util.List; import java.util.concurrent.Callable; @@ -37,6 +38,9 @@ */ public class RenamingSnapshotCommit implements SnapshotCommit { + private static final String HADOOP_FILE_ALREADY_EXISTS_EXCEPTION = + "org.apache.hadoop.fs.FileAlreadyExistsException"; + private final SnapshotManager snapshotManager; private final FileIO fileIO; private final Lock lock; @@ -50,30 +54,24 @@ public RenamingSnapshotCommit(SnapshotManager snapshotManager, Lock lock) { @Override public boolean commit(Snapshot snapshot, String branch, List statistics) throws Exception { - Path newSnapshotPath = + SnapshotManager targetSnapshotManager = snapshotManager.branch().equals(branch) - ? snapshotManager.snapshotPath(snapshot.id()) - : snapshotManager.copyWithBranch(branch).snapshotPath(snapshot.id()); + ? snapshotManager + : snapshotManager.copyWithBranch(branch); + Path newSnapshotPath = targetSnapshotManager.snapshotPath(snapshot.id()); + boolean supportsNoListCommit = + fileIO.isObjectStore() + && fileIO.supportsAtomicCreateWithoutOverwrite(newSnapshotPath); Callable callable = () -> { - boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson()); - if (!committed) { - if (!fileIO.exists(newSnapshotPath)) { - throw new IOException( - "Commit snapshot " - + snapshot.id() - + " failed and " - + newSnapshotPath - + " not found"); - } - committed = - snapshot.equals( - Snapshot.fromJson(fileIO.readFileUtf8(newSnapshotPath))); - } + boolean committed = + supportsNoListCommit + ? writeNoOverwriteOrRecover(snapshot, newSnapshotPath) + : writeAtomicOrRecover(snapshot, newSnapshotPath); if (committed) { - snapshotManager.commitLatestHint(snapshot.id()); + targetSnapshotManager.commitLatestHint(snapshot.id()); } return committed; }; @@ -84,11 +82,102 @@ public boolean commit(Snapshot snapshot, String branch, List options.commitTimeout() - || retryCount >= options.commitMaxRetries()) { + boolean canRetry = + System.currentTimeMillis() - startMillis <= options.commitTimeout() + && retryCount < options.commitMaxRetries(); + if (!canRetry) { String message = String.format( "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.", @@ -752,12 +760,43 @@ private int tryCommit( throw new RuntimeException(message, retryResult.exception); } + if (retryResult instanceof CommitFailRetryResult + && ((CommitFailRetryResult) retryResult).requiresLatestSnapshotRecovery) { + recoveredLatestSnapshot = + recoverLatestSnapshotByListing( + ((CommitFailRetryResult) retryResult).exception); + } + retryWaiter.retryWait(retryCount); retryCount++; } return retryCount + 1; } + private Snapshot recoverLatestSnapshotByListing(Exception cause) { + try { + Snapshot latestSnapshot = + snapshotManager.latestSnapshotFromFileSystem( + LatestLookupMode.RECOVERY_REQUIRING_LIST); + if (latestSnapshot == null) { + throw new RuntimeException( + "Cannot recover latest snapshot after snapshot commit conflict because listing returned no snapshots."); + } + return latestSnapshot; + } catch (RuntimeException e) { + RuntimeException recoveryException = + new RuntimeException( + "Cannot recover latest snapshot after missing or stale LATEST. " + + "Recovery requires listing the snapshot directory; grant S3 ListBucket " + + "for this table prefix or restore a valid LATEST hint.", + e); + if (cause != null) { + recoveryException.addSuppressed(cause); + } + throw recoveryException; + } + } + private void checkSameBucketFromSnapshot( List deltaFiles, @Nullable Snapshot latestSnapshot) { if (latestSnapshot == null) { @@ -1130,6 +1169,9 @@ CommitResult tryCommitOnce( callback.call(finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot)); try { success = commitSnapshotImpl(newSnapshot, deltaStatistics); + } catch (SnapshotCommitConflictRequiresListRecoveryException e) { + LOG.warn("Recover latest snapshot by listing for snapshot commit conflict.", e); + return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, e, null, true); } catch (Exception e) { // commit exception, not sure about the situation and should not clean up the files LOG.warn("Retry commit for exception.", e); @@ -1263,14 +1305,39 @@ public boolean replaceManifestList( latest.properties(), nextRowId); - return commitSnapshotImpl(newSnapshot, emptyList()); + try { + return commitSnapshotImpl(newSnapshot, emptyList()); + } catch (SnapshotCommitConflictRequiresListRecoveryException e) { + LOG.warn("Manifest list replacement conflicted with a newer snapshot.", e); + return false; + } } public void compactManifest() { int retryCount = 0; + Snapshot recoveredLatestSnapshot = null; long startMillis = System.currentTimeMillis(); while (true) { - boolean success = compactManifestOnce(); + boolean success; + try { + success = compactManifestOnce(recoveredLatestSnapshot); + recoveredLatestSnapshot = null; + } catch (SnapshotCommitConflictRequiresListRecoveryException e) { + LOG.warn("Recover latest snapshot by listing for manifest compaction conflict.", e); + if (System.currentTimeMillis() - startMillis > options.commitTimeout() + || retryCount >= options.commitMaxRetries()) { + throw new RuntimeException( + String.format( + "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.", + options.commitTimeout(), retryCount), + e); + } + + recoveredLatestSnapshot = recoverLatestSnapshotByListing(e); + retryWaiter.retryWait(retryCount); + retryCount++; + continue; + } if (success) { break; } @@ -1288,8 +1355,11 @@ public void compactManifest() { } } - private boolean compactManifestOnce() { - Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + private boolean compactManifestOnce(@Nullable Snapshot recoveredLatestSnapshot) { + Snapshot latestSnapshot = + recoveredLatestSnapshot == null + ? snapshotManager.latestSnapshot() + : recoveredLatestSnapshot; if (latestSnapshot == null) { return true; @@ -1352,6 +1422,8 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List de statistics.add(entry.toPartitionStatistics(partitionComputer)); } return snapshotCommit.commit(newSnapshot, options.branch(), statistics); + } catch (SnapshotCommitConflictRequiresListRecoveryException e) { + throw e; } catch (Throwable e) { // exception when performing the atomic rename, // we cannot clean up because we can't determine the success diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java index 717df209cedc..9474b8ea7a4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java @@ -42,7 +42,21 @@ public static RetryCommitResult forCommitFail( List baseDataFiles, Exception exception, @Nullable ManifestMergeResult manifestMergeResult) { - return new CommitFailRetryResult(snapshot, baseDataFiles, exception, manifestMergeResult); + return forCommitFail(snapshot, baseDataFiles, exception, manifestMergeResult, false); + } + + public static RetryCommitResult forCommitFail( + Snapshot snapshot, + List baseDataFiles, + Exception exception, + @Nullable ManifestMergeResult manifestMergeResult, + boolean requiresLatestSnapshotRecovery) { + return new CommitFailRetryResult( + snapshot, + baseDataFiles, + exception, + manifestMergeResult, + requiresLatestSnapshotRecovery); } public static RetryCommitResult forRollback(Exception exception) { @@ -60,16 +74,19 @@ public static class CommitFailRetryResult extends RetryCommitResult { public final @Nullable Snapshot latestSnapshot; public final @Nullable List baseDataFiles; public final @Nullable ManifestMergeResult manifestMergeResult; + public final boolean requiresLatestSnapshotRecovery; private CommitFailRetryResult( @Nullable Snapshot latestSnapshot, @Nullable List baseDataFiles, Exception exception, - @Nullable ManifestMergeResult manifestMergeResult) { + @Nullable ManifestMergeResult manifestMergeResult, + boolean requiresLatestSnapshotRecovery) { super(exception); this.latestSnapshot = latestSnapshot; this.baseDataFiles = baseDataFiles; this.manifestMergeResult = manifestMergeResult; + this.requiresLatestSnapshotRecovery = requiresLatestSnapshotRecovery; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java index 7a8eaf6f6aee..07e2d342f818 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java @@ -18,12 +18,16 @@ package org.apache.paimon.utils; +import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.AccessDeniedException; +import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.BinaryOperator; @@ -39,10 +43,51 @@ public class HintFileUtils { private static final int READ_HINT_RETRY_NUM = 3; private static final int READ_HINT_RETRY_INTERVAL = 1; + private static final String SNAPSHOT_PREFIX = "snapshot-"; + + /** Lookup mode for discovering the latest snapshot. */ + public enum LatestLookupMode { + NORMAL, + RECOVERY_REQUIRING_LIST + } @Nullable public static Long findLatest(FileIO fileIO, Path dir, String prefix, Function file) throws IOException { + return findLatest(fileIO, dir, prefix, file, LatestLookupMode.NORMAL); + } + + @Nullable + public static Long findLatest( + FileIO fileIO, + Path dir, + String prefix, + Function file, + LatestLookupMode mode) + throws IOException { + if (mode == LatestLookupMode.RECOVERY_REQUIRING_LIST) { + return findLatestByListForRecovery(fileIO, dir, prefix); + } + + if (SNAPSHOT_PREFIX.equals(prefix) + && fileIO.isObjectStore() + && fileIO.supportsAtomicCreateWithoutOverwrite( + file.apply(Snapshot.FIRST_SNAPSHOT_ID))) { + Optional latestHint = readLatestHintForNoListObjectStore(fileIO, LATEST, dir); + Long snapshotId = latestHint.orElse(null); + if (supportsNoListSnapshotCommit(fileIO, file, snapshotId)) { + return snapshotId == null + ? null + : findLatestForObjectStore(fileIO, file, snapshotId); + } + } + + return findLatestByHintThenList(fileIO, dir, prefix, file); + } + + @Nullable + private static Long findLatestByHintThenList( + FileIO fileIO, Path dir, String prefix, Function file) throws IOException { Long snapshotId = readHint(fileIO, LATEST, dir); if (snapshotId != null && snapshotId > 0) { long nextSnapshot = snapshotId + 1; @@ -54,6 +99,123 @@ public static Long findLatest(FileIO fileIO, Path dir, String prefix, Function file, @Nullable Long latestHint) + throws IOException { + long nextSnapshot = latestHint == null ? Snapshot.FIRST_SNAPSHOT_ID : latestHint + 1; + return fileIO.supportsAtomicCreateWithoutOverwrite(file.apply(nextSnapshot)); + } + + private static Long findLatestForObjectStore( + FileIO fileIO, Function file, Long latestHint) { + Path snapshotPath = file.apply(latestHint); + String snapshotJson; + try { + snapshotJson = fileIO.readFileUtf8(snapshotPath); + } catch (FileNotFoundException e) { + throw new RuntimeException( + "Cannot safely use LATEST hint " + + latestHint + + " because " + + snapshotPath + + " is missing.", + e); + } catch (IOException e) { + throw new RuntimeException( + "Cannot safely use LATEST hint " + + latestHint + + " because " + + snapshotPath + + " is unreadable.", + e); + } catch (RuntimeException e) { + throw new RuntimeException( + "Cannot safely use LATEST hint " + + latestHint + + " because " + + snapshotPath + + " is unreadable.", + e); + } + + Snapshot snapshot; + try { + snapshot = Snapshot.fromJson(snapshotJson); + } catch (RuntimeException e) { + throw new RuntimeException( + "Cannot safely use LATEST hint " + + latestHint + + " because " + + snapshotPath + + " is malformed.", + e); + } + + if (snapshot.id() != latestHint) { + throw new RuntimeException( + "Cannot safely use LATEST hint " + + latestHint + + " because " + + snapshotPath + + " contains snapshot " + + snapshot.id() + + "."); + } + + failIfNextSnapshotExists(fileIO, file, latestHint); + return latestHint; + } + + private static void failIfNextSnapshotExists( + FileIO fileIO, Function file, Long latestHint) { + Path nextSnapshotPath = file.apply(latestHint + 1); + boolean nextExists; + try { + fileIO.readFileUtf8(nextSnapshotPath); + nextExists = true; + } catch (FileNotFoundException e) { + nextExists = false; + } catch (IOException | RuntimeException e) { + if (containsAccessDenied(e)) { + nextExists = false; + } else { + throw new RuntimeException( + "Cannot safely verify LATEST hint " + + latestHint + + " because " + + nextSnapshotPath + + " could not be checked.", + e); + } + } + + if (nextExists) { + throw new RuntimeException( + "Cannot safely use LATEST hint " + + latestHint + + " because " + + nextSnapshotPath + + " already exists. Recovery requires ListBucket permission or " + + "restoring a valid LATEST hint."); + } + } + + @Nullable + private static Long findLatestByListForRecovery(FileIO fileIO, Path dir, String prefix) + throws IOException { + try { + return findByListFiles(fileIO, Math::max, dir, prefix); + } catch (IOException e) { + throw new IOException( + "Recovery requires ListBucket permission to list table prefix " + + dir + + " for files with prefix " + + prefix + + ", or restoring a valid LATEST hint.", + e); + } + } + @Nullable public static Long findEarliest( FileIO fileIO, Path dir, String prefix, Function file) throws IOException { @@ -84,6 +246,99 @@ public static Long readHint(FileIO fileIO, String fileName, Path dir) { return null; } + private static Optional readLatestHintForNoListObjectStore( + FileIO fileIO, String fileName, Path dir) { + Path path = new Path(dir, fileName); + int retryNumber = 0; + Exception exception = null; + while (retryNumber++ < READ_HINT_RETRY_NUM) { + try { + long snapshotId = Long.parseLong(fileIO.readFileUtf8(path)); + if (snapshotId <= 0) { + throw new NumberFormatException("Latest snapshot id must be positive."); + } + return Optional.of(snapshotId); + } catch (NumberFormatException e) { + throw new RuntimeException( + "Cannot safely use LATEST hint because it is malformed.", e); + } catch (FileNotFoundException e) { + return missingLatestHintForNoListObjectStore(fileIO, dir, e); + } catch (IOException | RuntimeException e) { + if (containsAccessDenied(e)) { + return missingLatestHintForNoListObjectStore(fileIO, dir, e); + } + exception = e; + if (retryNumber < READ_HINT_RETRY_NUM) { + sleepBeforeRetry(); + } + } + } + + throw new RuntimeException( + "Cannot safely determine latest snapshot because LATEST hint is unreadable.", + exception); + } + + private static Optional missingLatestHintForNoListObjectStore( + FileIO fileIO, Path dir, Exception latestException) { + Optional earliestHint = readEarliestHintForNoListObjectStore(fileIO, dir); + if (earliestHint.isPresent()) { + throw new RuntimeException( + "Cannot safely treat missing LATEST hint as a new table because EARLIEST " + + "hint " + + earliestHint.get() + + " is present. Recovery requires ListBucket permission or restoring " + + "a valid LATEST hint.", + latestException); + } + return Optional.empty(); + } + + private static Optional readEarliestHintForNoListObjectStore(FileIO fileIO, Path dir) { + Path path = new Path(dir, EARLIEST); + try { + long snapshotId = Long.parseLong(fileIO.readFileUtf8(path)); + if (snapshotId <= 0) { + throw new NumberFormatException("Earliest snapshot id must be positive."); + } + return Optional.of(snapshotId); + } catch (NumberFormatException e) { + throw new RuntimeException( + "Cannot safely determine whether LATEST hint is absent because EARLIEST " + + "hint is malformed.", + e); + } catch (FileNotFoundException e) { + return Optional.empty(); + } catch (IOException | RuntimeException e) { + if (containsAccessDenied(e)) { + return Optional.empty(); + } + throw new RuntimeException( + "Cannot safely determine whether LATEST hint is absent because EARLIEST " + + "hint is unreadable.", + e); + } + } + + private static boolean containsAccessDenied(Throwable throwable) { + while (throwable != null) { + if (throwable instanceof AccessDeniedException) { + return true; + } + throwable = throwable.getCause(); + } + return false; + } + + private static void sleepBeforeRetry() { + try { + TimeUnit.MILLISECONDS.sleep(READ_HINT_RETRY_INTERVAL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + public static Long findByListFiles( FileIO fileIO, BinaryOperator reducer, Path dir, String prefix) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index af21181214b3..f0aff21dc0a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.table.Instant; +import org.apache.paimon.utils.HintFileUtils.LatestLookupMode; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; @@ -184,7 +185,11 @@ public void deleteSnapshot(long snapshotId) { } public @Nullable Snapshot latestSnapshotFromFileSystem() { - Long snapshotId = latestSnapshotIdFromFileSystem(); + return latestSnapshotFromFileSystem(LatestLookupMode.NORMAL); + } + + public @Nullable Snapshot latestSnapshotFromFileSystem(LatestLookupMode mode) { + Long snapshotId = latestSnapshotIdFromFileSystem(mode); return snapshotId == null ? null : snapshot(snapshotId); } @@ -203,8 +208,12 @@ public void deleteSnapshot(long snapshotId) { } public @Nullable Long latestSnapshotIdFromFileSystem() { + return latestSnapshotIdFromFileSystem(LatestLookupMode.NORMAL); + } + + public @Nullable Long latestSnapshotIdFromFileSystem(LatestLookupMode mode) { try { - return findLatest(snapshotDirectory(), SNAPSHOT_PREFIX, this::snapshotPath); + return findLatest(snapshotDirectory(), SNAPSHOT_PREFIX, this::snapshotPath, mode); } catch (IOException e) { throw new RuntimeException("Failed to find latest snapshot id", e); } @@ -710,7 +719,13 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { private @Nullable Long findLatest(Path dir, String prefix, Function file) throws IOException { - return HintFileUtils.findLatest(fileIO, dir, prefix, file); + return findLatest(dir, prefix, file, LatestLookupMode.NORMAL); + } + + private @Nullable Long findLatest( + Path dir, String prefix, Function file, LatestLookupMode mode) + throws IOException { + return HintFileUtils.findLatest(fileIO, dir, prefix, file, mode); } private @Nullable Long findEarliest(Path dir, String prefix, Function file) diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java index 6e8ae6f36a47..a02aec966e4c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java @@ -20,7 +20,9 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.operation.Lock; import org.apache.paimon.utils.HintFileUtils; @@ -30,7 +32,10 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.AccessDeniedException; import java.nio.file.Files; +import java.nio.file.StandardOpenOption; import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; @@ -101,7 +106,177 @@ public void testCommitTargetSnapshotMissing(@TempDir java.nio.file.Path tmp) thr assertThat(fileIO.exists(snapshotManager.snapshotPath(snapshot.id()))).isFalse(); } + @Test + public void testObjectStoreCommitDoesNotProbeMissingSnapshot(@TempDir java.nio.file.Path tmp) + throws Exception { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + Path tablePath = new Path(tmp.toUri()); + SnapshotManager snapshotManager = new SnapshotManager(fileIO, tablePath, null, null, null); + + RenamingSnapshotCommit commit = new RenamingSnapshotCommit(snapshotManager, Lock.empty()); + Snapshot snapshot = createSnapshot(1L); + + boolean committed = commit.commit(snapshot, "main", Collections.emptyList()); + + assertThat(committed).isTrue(); + assertThat(fileIO.snapshotExistsCalls).isZero(); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.noOverwriteWrites).isEqualTo(1); + assertThat(Snapshot.fromJson(fileIO.readFileUtf8(snapshotManager.snapshotPath(1L)))) + .isEqualTo(snapshot); + assertThat( + fileIO.readFileUtf8( + new Path( + snapshotManager.snapshotDirectory(), HintFileUtils.LATEST))) + .isEqualTo("1"); + } + + @Test + public void testObjectStoreCommitConflictRequiresListRecovery(@TempDir java.nio.file.Path tmp) + throws Exception { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + Path tablePath = new Path(tmp.toUri()); + SnapshotManager snapshotManager = new SnapshotManager(fileIO, tablePath, null, null, null); + Snapshot committedSnapshot = createSnapshot(1L, "committed", 1L); + Snapshot staleSnapshot = createSnapshot(1L, "stale", 2L); + Path snapshotPath = snapshotManager.snapshotPath(1L); + fileIO.writeFile(snapshotPath, committedSnapshot.toJson(), false); + fileIO.resetCounters(); + + RenamingSnapshotCommit commit = new RenamingSnapshotCommit(snapshotManager, Lock.empty()); + + RenamingSnapshotCommit.SnapshotCommitConflictRequiresListRecoveryException ex = + assertThrows( + RenamingSnapshotCommit.SnapshotCommitConflictRequiresListRecoveryException + .class, + () -> commit.commit(staleSnapshot, "main", Collections.emptyList())); + + assertThat(ex) + .hasMessageContaining("Cannot safely commit snapshot 1") + .hasMessageContaining("already exists with different content") + .hasMessageContaining("recovery requires listing"); + assertThat(fileIO.snapshotExistsCalls).isZero(); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(Snapshot.fromJson(fileIO.readFileUtf8(snapshotPath))) + .isEqualTo(committedSnapshot); + } + + @Test + public void testObjectStoreSameSnapshotContentIsIdempotent(@TempDir java.nio.file.Path tmp) + throws Exception { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + Path tablePath = new Path(tmp.toUri()); + SnapshotManager snapshotManager = new SnapshotManager(fileIO, tablePath, null, null, null); + Snapshot snapshot = createSnapshot(1L); + Path snapshotPath = snapshotManager.snapshotPath(1L); + fileIO.writeFile(snapshotPath, snapshot.toJson(), false); + fileIO.resetCounters(); + + RenamingSnapshotCommit commit = new RenamingSnapshotCommit(snapshotManager, Lock.empty()); + + boolean committed = commit.commit(snapshot, "main", Collections.emptyList()); + + assertThat(committed).isTrue(); + assertThat(fileIO.snapshotExistsCalls).isZero(); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.noOverwriteWrites).isEqualTo(1); + assertThat(Snapshot.fromJson(fileIO.readFileUtf8(snapshotPath))).isEqualTo(snapshot); + } + + @Test + public void testObjectStoreCommitConflictFailsClosedForMalformedExistingSnapshot( + @TempDir java.nio.file.Path tmp) throws Exception { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + Path tablePath = new Path(tmp.toUri()); + SnapshotManager snapshotManager = new SnapshotManager(fileIO, tablePath, null, null, null); + Path snapshotPath = snapshotManager.snapshotPath(1L); + fileIO.writeFile(snapshotPath, "not-json", false); + fileIO.resetCounters(); + + RenamingSnapshotCommit commit = new RenamingSnapshotCommit(snapshotManager, Lock.empty()); + + RenamingSnapshotCommit.SnapshotCommitConflictRequiresListRecoveryException ex = + assertThrows( + RenamingSnapshotCommit.SnapshotCommitConflictRequiresListRecoveryException + .class, + () -> commit.commit(createSnapshot(1L), "main", Collections.emptyList())); + + assertThat(ex) + .hasMessageContaining("already exists but is malformed") + .hasMessageContaining("recovery requires listing"); + assertThat(fileIO.snapshotExistsCalls).isZero(); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.readFileUtf8(snapshotPath)).isEqualTo("not-json"); + } + + @Test + public void testObjectStoreCommitConflictFailsClosedForUnreadableExistingSnapshot( + @TempDir java.nio.file.Path tmp) throws Exception { + UnreadableSnapshotObjectStoreFileIO fileIO = new UnreadableSnapshotObjectStoreFileIO(); + Path tablePath = new Path(tmp.toUri()); + SnapshotManager snapshotManager = new SnapshotManager(fileIO, tablePath, null, null, null); + Snapshot committedSnapshot = createSnapshot(1L, "committed", 1L); + Snapshot staleSnapshot = createSnapshot(1L, "stale", 2L); + Path snapshotPath = snapshotManager.snapshotPath(1L); + fileIO.writeFile(snapshotPath, committedSnapshot.toJson(), false); + fileIO.resetCounters(); + fileIO.denySnapshotReads = true; + + RenamingSnapshotCommit commit = new RenamingSnapshotCommit(snapshotManager, Lock.empty()); + + RenamingSnapshotCommit.SnapshotCommitConflictRequiresListRecoveryException ex = + assertThrows( + RenamingSnapshotCommit.SnapshotCommitConflictRequiresListRecoveryException + .class, + () -> commit.commit(staleSnapshot, "main", Collections.emptyList())); + + assertThat(ex) + .hasMessageContaining("already exists but cannot be read") + .hasMessageContaining("recovery requires listing"); + assertThat(fileIO.snapshotExistsCalls).isZero(); + assertThat(fileIO.listStatusCalls).isZero(); + fileIO.denySnapshotReads = false; + assertThat(Snapshot.fromJson(fileIO.readFileUtf8(snapshotPath))) + .isEqualTo(committedSnapshot); + } + + @Test + public void testBranchCommitUpdatesBranchLatestHint(@TempDir java.nio.file.Path tmp) + throws Exception { + FileIO fileIO = new LocalFileIO(); + Path tablePath = new Path(tmp.toUri()); + SnapshotManager mainSnapshotManager = + new SnapshotManager(fileIO, tablePath, null, null, null); + SnapshotManager branchSnapshotManager = mainSnapshotManager.copyWithBranch("dev"); + + RenamingSnapshotCommit commit = + new RenamingSnapshotCommit(mainSnapshotManager, Lock.empty()); + Snapshot snapshot = createSnapshot(1L); + + assertThat(commit.commit(snapshot, "dev", Collections.emptyList())).isTrue(); + + assertThat(fileIO.exists(branchSnapshotManager.snapshotPath(1L))).isTrue(); + assertThat(fileIO.exists(mainSnapshotManager.snapshotPath(1L))).isFalse(); + assertThat( + fileIO.readOverwrittenFileUtf8( + new Path( + branchSnapshotManager.snapshotDirectory(), + HintFileUtils.LATEST))) + .contains("1"); + assertThat( + fileIO.readOverwrittenFileUtf8( + new Path( + mainSnapshotManager.snapshotDirectory(), + HintFileUtils.LATEST))) + .isEmpty(); + } + private static Snapshot createSnapshot(long id) throws IOException { + return createSnapshot(id, "user", id); + } + + private static Snapshot createSnapshot(long id, String commitUser, long commitIdentifier) + throws IOException { long schemaId = 1L; String baseManifestList = "manifest-list-base"; String deltaManifestList = "manifest-list-delta"; @@ -110,8 +285,6 @@ private static Snapshot createSnapshot(long id) throws IOException { Long deltaManifestListSize = 20L; Long changelogManifestListSize = null; String indexManifest = null; - String commitUser = "user"; - long commitIdentifier = id; Snapshot.CommitKind commitKind = Snapshot.CommitKind.APPEND; long timeMillis = System.currentTimeMillis(); Long totalRecordCount = 100L; @@ -169,4 +342,143 @@ public boolean rename(Path src, Path dst) throws IOException { } } } + + private static class NoListObjectStoreFileIO extends LocalFileIO { + + int snapshotExistsCalls; + int listStatusCalls; + int noOverwriteWrites; + + @Override + public boolean isObjectStore() { + return true; + } + + @Override + public boolean supportsAtomicCreateWithoutOverwrite(Path path) { + return isSnapshotFile(path); + } + + @Override + public boolean exists(Path path) throws IOException { + if (isSnapshotFile(path)) { + snapshotExistsCalls++; + throw new AccessDeniedException(path.toString()); + } + return super.exists(path); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + listStatusCalls++; + throw new AccessDeniedException(path.toString()); + } + + @Override + public PositionOutputStream newOutputStream(Path path, boolean overwrite) + throws IOException { + if (!isSnapshotFile(path)) { + return super.newOutputStream(path, overwrite); + } + + if (!overwrite) { + noOverwriteWrites++; + } + + java.nio.file.Path localPath = java.nio.file.Paths.get(path.toUri()); + java.nio.file.Path parent = localPath.getParent(); + if (parent != null) { + Files.createDirectories(parent); + } + + OutputStream out = + overwrite + ? Files.newOutputStream( + localPath, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.WRITE) + : Files.newOutputStream( + localPath, + StandardOpenOption.CREATE_NEW, + StandardOpenOption.WRITE); + return new OutputStreamPositionOutputStream(out); + } + + void resetCounters() { + snapshotExistsCalls = 0; + listStatusCalls = 0; + noOverwriteWrites = 0; + } + + protected boolean isSnapshotFile(Path path) { + Path parent = path.getParent(); + if (parent == null || !"snapshot".equals(parent.getName())) { + return false; + } + + String fileName = path.getName(); + return fileName.startsWith("snapshot-") + || HintFileUtils.LATEST.equals(fileName) + || HintFileUtils.EARLIEST.equals(fileName); + } + } + + private static class UnreadableSnapshotObjectStoreFileIO extends NoListObjectStoreFileIO { + + boolean denySnapshotReads; + + @Override + public String readFileUtf8(Path path) throws IOException { + if (denySnapshotReads + && isSnapshotFile(path) + && path.getName().startsWith("snapshot-")) { + throw new AccessDeniedException(path.toString()); + } + return super.readFileUtf8(path); + } + } + + private static class OutputStreamPositionOutputStream extends PositionOutputStream { + + private final OutputStream out; + private long pos; + + OutputStreamPositionOutputStream(OutputStream out) { + this.out = out; + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + pos++; + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + pos += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + pos += len; + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 5a386947c172..88ce65c5dcc1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.TestFileStore; import org.apache.paimon.TestKeyValueGenerator; import org.apache.paimon.catalog.RenamingSnapshotCommit; +import org.apache.paimon.catalog.RenamingSnapshotCommit.SnapshotCommitConflictRequiresListRecoveryException; import org.apache.paimon.catalog.SnapshotCommit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.BucketedDvMaintainer; @@ -63,6 +64,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.HintFileUtils.LatestLookupMode; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TraceableFileIO; @@ -1187,6 +1189,138 @@ public void testCommitRetryAfterFalseSuccessDoesNotCleanManifest() throws Except assertThat(store.readKvsFromSnapshot(latestSnapshot.id())).hasSize(1); } + @Test + public void testSnapshotCommitConflictTriggersListRecoveryFromCommitLoop() throws Exception { + TestFileStore store = createStore(false); + KeyValue first = gen.next(); + store.commitData(Collections.singletonList(first), gen::getPartition, value -> 0); + + KeyValue second = gen.next(); + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(second), + gen::getPartition, + value -> 0, + false, + 18L, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + RecoveryTrackingSnapshotManager snapshotManager = + new RecoveryTrackingSnapshotManager(store.snapshotManager()); + RecoveryOnceSnapshotCommit snapshotCommit = + new RecoveryOnceSnapshotCommit( + new RenamingSnapshotCommit(snapshotManager, Lock.empty())); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit( + store, "recover-after-conflict", snapshotCommit, snapshotManager)) { + commit.commit(checkNotNull(committableRef.get()), false); + } + + assertThat(snapshotCommit.attempts).isEqualTo(2); + assertThat(snapshotManager.recoveryLookups).isEqualTo(1); + assertThat(store.snapshotManager().latestSnapshotId()).isEqualTo(2L); + } + + @Test + public void testSnapshotCommitConflictDoesNotRecoverWhenRetryBudgetExhausted() + throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.COMMIT_MAX_RETRIES.key(), "0"); + TestFileStore store = createStore(false, options); + + KeyValue record = gen.next(); + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(record), + gen::getPartition, + value -> 0, + false, + 18L, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + RecoveryTrackingSnapshotManager snapshotManager = + new RecoveryTrackingSnapshotManager(store.snapshotManager()); + RecoveryOnceSnapshotCommit snapshotCommit = + new RecoveryOnceSnapshotCommit( + new RenamingSnapshotCommit(snapshotManager, Lock.empty())); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit( + store, "recover-after-conflict", snapshotCommit, snapshotManager)) { + assertThatThrownBy(() -> commit.commit(checkNotNull(committableRef.get()), false)) + .hasMessageContaining("Commit failed after") + .hasRootCauseInstanceOf( + SnapshotCommitConflictRequiresListRecoveryException.class); + } + + assertThat(snapshotCommit.attempts).isEqualTo(1); + assertThat(snapshotManager.recoveryLookups).isZero(); + } + + @Test + public void testManifestCompactConflictTriggersListRecovery() throws Exception { + TestFileStore store = createStore(false); + + List keyValues = generateDataList(1); + BinaryRow partition = gen.getPartition(keyValues.get(0)); + store.commitData(keyValues, s -> partition, kv -> 0); + store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()); + Snapshot latest = + store.overwriteData(keyValues, s -> partition, kv -> 0, Collections.emptyMap()) + .get(0); + + RecoveryTrackingSnapshotManager snapshotManager = + new RecoveryTrackingSnapshotManager(store.snapshotManager()); + RecoveryOnceSnapshotCommit snapshotCommit = + new RecoveryOnceSnapshotCommit( + new RenamingSnapshotCommit(snapshotManager, Lock.empty())); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit( + store, "compact-recover-after-conflict", snapshotCommit, snapshotManager)) { + commit.compactManifest(); + } + + assertThat(snapshotCommit.attempts).isEqualTo(2); + assertThat(snapshotManager.recoveryLookups).isEqualTo(1); + assertThat(store.snapshotManager().latestSnapshotId()).isEqualTo(latest.id() + 1); + } + + @Test + public void testReplaceManifestListConflictReturnsFalse() throws Exception { + TestFileStore store = createStore(false); + Snapshot latest = + store.commitData( + Collections.singletonList(gen.next()), + gen::getPartition, + value -> 0) + .get(0); + ManifestList manifestList = store.manifestListFactory().create(); + Pair baseManifestList = + manifestList.write(manifestList.readDataManifests(latest)); + Pair deltaManifestList = manifestList.write(Collections.emptyList()); + + RecoveryTrackingSnapshotManager snapshotManager = + new RecoveryTrackingSnapshotManager(store.snapshotManager()); + RecoveryOnceSnapshotCommit snapshotCommit = + new RecoveryOnceSnapshotCommit( + new RenamingSnapshotCommit(snapshotManager, Lock.empty())); + boolean replaced; + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit( + store, "replace-recover-after-conflict", snapshotCommit, snapshotManager)) { + replaced = + commit.replaceManifestList( + latest, latest.totalRecordCount(), baseManifestList, deltaManifestList); + } + + assertThat(replaced).isFalse(); + assertThat(snapshotCommit.attempts).isEqualTo(1); + assertThat(snapshotManager.recoveryLookups).isZero(); + } + @Test public void testCommitRetryReusePreviousManifestMergeResultWhenBeforeStillExists() throws Exception { @@ -1584,6 +1718,20 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( store.options().dataEvolutionEnabled()); } + private FileStoreCommitImpl newCommitWithSnapshotCommit( + TestFileStore store, + String commitUser, + SnapshotCommit snapshotCommit, + SnapshotManager snapshotManager) { + return newCommitWithSnapshotCommit( + store, + commitUser, + snapshotCommit, + store.options(), + store.options().dataEvolutionEnabled(), + snapshotManager); + } + private ManifestEntry addFile( BinaryRow partition, int bucket, int totalBuckets, long maxSequenceNumber) { return ManifestEntry.create( @@ -1614,6 +1762,22 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( SnapshotCommit snapshotCommit, CoreOptions options, boolean dataEvolutionEnabled) { + return newCommitWithSnapshotCommit( + store, + commitUser, + snapshotCommit, + options, + dataEvolutionEnabled, + store.snapshotManager()); + } + + private FileStoreCommitImpl newCommitWithSnapshotCommit( + TestFileStore store, + String commitUser, + SnapshotCommit snapshotCommit, + CoreOptions options, + boolean dataEvolutionEnabled, + SnapshotManager snapshotManager) { String tableName = store.options().path().getName(); return new FileStoreCommitImpl( snapshotCommit, @@ -1624,7 +1788,7 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( store.partitionType(), options, store.pathFactory(), - store.snapshotManager(), + snapshotManager, store.manifestFileFactory(), store.manifestListFactory(), store.indexManifestFileFactory(), @@ -1645,7 +1809,7 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit( dataEvolutionEnabled, options.pkClusteringOverride(), store.newIndexFileHandler(), - store.snapshotManager(), + snapshotManager, scanner), null); } @@ -1684,6 +1848,55 @@ private static List> conflictAttempts(List... return Arrays.asList(attempts); } + private static class RecoveryTrackingSnapshotManager extends SnapshotManager { + + private int recoveryLookups; + + private RecoveryTrackingSnapshotManager(SnapshotManager delegate) { + super(delegate.fileIO(), delegate.tablePath(), delegate.branch(), null, null); + } + + @Override + public Snapshot latestSnapshotFromFileSystem(LatestLookupMode mode) { + if (mode == LatestLookupMode.RECOVERY_REQUIRING_LIST) { + recoveryLookups++; + } + return super.latestSnapshotFromFileSystem(mode); + } + } + + private static class RecoveryOnceSnapshotCommit implements SnapshotCommit { + + private final SnapshotCommit delegate; + private int attempts; + + private RecoveryOnceSnapshotCommit(SnapshotCommit delegate) { + this.delegate = delegate; + } + + @Override + public boolean commit( + Snapshot snapshot, + String branch, + List statistics) + throws Exception { + attempts++; + if (attempts == 1) { + throw new SnapshotCommitConflictRequiresListRecoveryException( + "Cannot safely commit snapshot " + + snapshot.id() + + " because it already exists with different content. recovery requires listing", + null); + } + return delegate.commit(snapshot, branch, statistics); + } + + @Override + public void close() throws Exception { + delegate.close(); + } + } + private static class ConflictingSnapshotCommit implements SnapshotCommit { private final SnapshotCommit delegate; diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/HintFileUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/HintFileUtilsTest.java new file mode 100644 index 000000000000..6a4bd83c7003 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/HintFileUtilsTest.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.AccessDeniedException; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link HintFileUtils}. */ +public class HintFileUtilsTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testObjectStoreMissingLatestHintReturnsNullWithoutList() { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + + assertThat(snapshotManager.latestSnapshotId()).isNull(); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreMissingLatestHintRecoveryUsesList() throws IOException { + ListingObjectStoreFileIO fileIO = new ListingObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + writeSnapshot(snapshotManager, 1); + + assertThat( + snapshotManager.latestSnapshotIdFromFileSystem( + HintFileUtils.LatestLookupMode.RECOVERY_REQUIRING_LIST)) + .isEqualTo(1); + assertThat(fileIO.listStatusCalls).isGreaterThan(0); + } + + @Test + public void testObjectStoreMissingLatestHintRecoveryDeniedFailsWithListRequiredMessage() { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + + assertThatThrownBy( + () -> + snapshotManager.latestSnapshotIdFromFileSystem( + HintFileUtils.LatestLookupMode.RECOVERY_REQUIRING_LIST)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to find latest snapshot id") + .hasStackTraceContaining("requires ListBucket"); + } + + @Test + public void testObjectStoreMalformedLatestHintFailsClosedWithoutList() throws IOException { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + writeFile( + new Path(snapshotManager.snapshotDirectory(), HintFileUtils.LATEST), + "not-a-number"); + + assertThatThrownBy(snapshotManager::latestSnapshotId) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Cannot safely use LATEST hint because it is malformed."); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreAccessDeniedLatestHintReturnsNullWithoutList() { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + fileIO.denyLatestHintRead = true; + SnapshotManager snapshotManager = snapshotManager(fileIO); + + assertThat(snapshotManager.latestSnapshotId()).isNull(); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreAccessDeniedLatestHintFailsClosedWhenEarliestHintExists() + throws IOException { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + writeFile(new Path(snapshotManager.snapshotDirectory(), HintFileUtils.EARLIEST), "2"); + fileIO.denyLatestHintRead = true; + + assertThatThrownBy(snapshotManager::latestSnapshotId) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Cannot safely treat missing LATEST hint as a new table") + .hasStackTraceContaining("Recovery requires ListBucket"); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreAccessDeniedLatestHintDoesNotUseDefaultOverwrittenReadExistsProbe() { + DefaultOverwrittenReadNoListObjectStoreFileIO fileIO = + new DefaultOverwrittenReadNoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + + assertThat(snapshotManager.latestSnapshotId()).isNull(); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreLatestHintMissingSnapshotFailsClosedWithoutList() + throws IOException { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + writeFile(new Path(snapshotManager.snapshotDirectory(), HintFileUtils.LATEST), "2"); + + assertThatThrownBy(snapshotManager::latestSnapshotId) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining( + "Cannot safely use LATEST hint 2 because " + + snapshotManager.snapshotPath(2) + + " is missing."); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreMalformedLatestSnapshotFailsClosedWithoutList() throws IOException { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + writeFile(new Path(snapshotManager.snapshotDirectory(), HintFileUtils.LATEST), "2"); + writeFile(snapshotManager.snapshotPath(2), "not-json"); + + assertThatThrownBy(snapshotManager::latestSnapshotId) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("snapshot-2 is malformed."); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreLatestSnapshotIdMismatchFailsClosedWithoutList() throws IOException { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + writeFile(new Path(snapshotManager.snapshotDirectory(), HintFileUtils.LATEST), "2"); + writeFile(snapshotManager.snapshotPath(2), createSnapshot(3).toJson()); + + assertThatThrownBy(snapshotManager::latestSnapshotId) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("contains snapshot 3."); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreStaleLatestHintWithNextSnapshotFailsClosedWithoutList() + throws IOException { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + writeFile(new Path(snapshotManager.snapshotDirectory(), HintFileUtils.LATEST), "1"); + writeSnapshot(snapshotManager, 1); + writeSnapshot(snapshotManager, 2); + + assertThatThrownBy(snapshotManager::latestSnapshotId) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Cannot safely use LATEST hint 1") + .hasMessageContaining("snapshot-2 already exists") + .hasStackTraceContaining("Recovery requires ListBucket"); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreUsesLatestHintWithoutProbingNextSnapshot() throws IOException { + NoListObjectStoreFileIO fileIO = new NoListObjectStoreFileIO(); + SnapshotManager snapshotManager = snapshotManager(fileIO); + writeFile(new Path(snapshotManager.snapshotDirectory(), HintFileUtils.LATEST), "1"); + writeSnapshot(snapshotManager, 1); + + assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1); + assertThat(fileIO.listStatusCalls).isZero(); + assertThat(fileIO.existsCalls).isZero(); + } + + @Test + public void testObjectStoreNonSnapshotPrefixUsesListFallback() throws IOException { + ListingObjectStoreFileIO fileIO = new ListingObjectStoreFileIO(); + Path dir = new Path(tempDir.toString()); + writeFile(new Path(dir, "changelog-1"), "not-a-snapshot-json"); + + assertThat( + HintFileUtils.findLatest( + fileIO, dir, "changelog-", id -> new Path(dir, "changelog-" + id))) + .isEqualTo(1); + assertThat(fileIO.listStatusCalls).isGreaterThan(0); + } + + private SnapshotManager snapshotManager(LocalFileIO fileIO) { + return new SnapshotManager(fileIO, new Path(tempDir.toString()), null, null, null); + } + + private static void writeSnapshot(SnapshotManager snapshotManager, long id) throws IOException { + writeFile(snapshotManager.snapshotPath(id), createSnapshot(id).toJson()); + } + + private static void writeFile(Path path, String content) throws IOException { + LocalFileIO.create().overwriteFileUtf8(path, content); + } + + private static Snapshot createSnapshot(long id) { + return new Snapshot( + id, + 1L, + "manifest-list-base", + 10L, + "manifest-list-delta", + 20L, + null, + null, + null, + "user", + id, + Snapshot.CommitKind.APPEND, + System.currentTimeMillis(), + 100L, + 100L, + null, + null, + null, + null, + null); + } + + private static class NoListObjectStoreFileIO extends LocalFileIO { + + private int listStatusCalls; + private int existsCalls; + private boolean denyLatestHintRead; + + @Override + public boolean isObjectStore() { + return true; + } + + @Override + public boolean supportsAtomicCreateWithoutOverwrite(Path path) { + return true; + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + listStatusCalls++; + throw new AccessDeniedException(path.toString()); + } + + @Override + public boolean exists(Path path) throws IOException { + existsCalls++; + throw new AccessDeniedException(path.toString()); + } + + @Override + public Optional readOverwrittenFileUtf8(Path path) throws IOException { + if (denyLatestHintRead && HintFileUtils.LATEST.equals(path.getName())) { + throw new AccessDeniedException(path.toString()); + } + try { + return Optional.of(readFileUtf8(path)); + } catch (java.io.FileNotFoundException e) { + return Optional.empty(); + } + } + + @Override + public String readFileUtf8(Path path) throws IOException { + if (denyLatestHintRead && HintFileUtils.LATEST.equals(path.getName())) { + throw new AccessDeniedException(path.toString()); + } + return super.readFileUtf8(path); + } + } + + private static class DefaultOverwrittenReadNoListObjectStoreFileIO extends LocalFileIO { + + private int listStatusCalls; + private int existsCalls; + + @Override + public boolean isObjectStore() { + return true; + } + + @Override + public boolean supportsAtomicCreateWithoutOverwrite(Path path) { + return true; + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + listStatusCalls++; + throw new AccessDeniedException(path.toString()); + } + + @Override + public boolean exists(Path path) throws IOException { + existsCalls++; + throw new AccessDeniedException(path.toString()); + } + + @Override + public String readFileUtf8(Path path) throws IOException { + if (HintFileUtils.LATEST.equals(path.getName())) { + throw new AccessDeniedException(path.toString()); + } + return super.readFileUtf8(path); + } + } + + private static class ListingObjectStoreFileIO extends LocalFileIO { + + private int listStatusCalls; + + @Override + public boolean isObjectStore() { + return true; + } + + @Override + public boolean supportsAtomicCreateWithoutOverwrite(Path path) { + return true; + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + listStatusCalls++; + return super.listStatus(path); + } + } +} diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java index 827251837342..beaae853f721 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java @@ -21,18 +21,27 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.TwoPhaseOutputStream; import org.apache.paimon.options.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.s3a.S3ADataBlocks; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.statistics.impl.StubDurationTrackerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.nio.file.FileAlreadyExistsException; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -48,6 +57,16 @@ public class S3FileIO extends HadoopCompliantFileIO { private static final String HADOOP_CONFIG_PREFIX = "fs.s3a."; + private static final String SNAPSHOT_DIRECTORY = "snapshot"; + + private static final String SNAPSHOT_FILE_PREFIX = "snapshot-"; + + private static final String LATEST = "LATEST"; + + private static final String EARLIEST = "EARLIEST"; + + private static final String IF_NONE_MATCH_ANY = "*"; + private static final String[][] MIRRORED_CONFIG_KEYS = { {"fs.s3a.access-key", "fs.s3a.access.key"}, {"fs.s3a.secret-key", "fs.s3a.secret.key"}, @@ -73,6 +92,24 @@ public void configure(CatalogContext context) { this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context)); } + @Override + public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException { + if (isSnapshotMetadataFile(path)) { + return newDirectOutputStream(path, overwrite); + } + return super.newOutputStream(path, overwrite); + } + + @Override + public boolean supportsAtomicCreateWithoutOverwrite(Path path) { + return isSnapshotMetadataFile(path); + } + + protected PositionOutputStream newDirectOutputStream(Path path, boolean overwrite) + throws IOException { + return new DirectS3OutputStream(path, overwrite); + } + @Override public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite) throws IOException { @@ -114,6 +151,42 @@ private Options mirrorCertainHadoopConfig(Options hadoopConfig) { return hadoopConfig; } + static PutObjectRequest applyOverwriteMode(PutObjectRequest request, boolean overwrite) { + if (overwrite) { + return request; + } + return request.toBuilder().ifNoneMatch(IF_NONE_MATCH_ANY).build(); + } + + static boolean isPreconditionFailed(Throwable throwable) { + while (throwable != null) { + if (throwable instanceof S3Exception) { + int statusCode = ((S3Exception) throwable).statusCode(); + return statusCode == 409 || statusCode == 412; + } + throwable = throwable.getCause(); + } + return false; + } + + private static boolean isSnapshotMetadataFile(Path path) { + Path parent = path.getParent(); + if (parent == null || !SNAPSHOT_DIRECTORY.equals(parent.getName())) { + return false; + } + + String fileName = path.getName(); + return fileName.startsWith(SNAPSHOT_FILE_PREFIX) + || LATEST.equals(fileName) + || EARLIEST.equals(fileName); + } + + private static FileAlreadyExistsException fileAlreadyExists(Path path, Throwable cause) { + FileAlreadyExistsException exception = new FileAlreadyExistsException(path.toString()); + exception.initCause(cause); + return exception; + } + @Override protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) { final String scheme = path.toUri().getScheme(); @@ -144,6 +217,87 @@ protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) { }); } + private class DirectS3OutputStream extends PositionOutputStream { + + private final Path path; + private final boolean overwrite; + private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + private boolean closed; + + private DirectS3OutputStream(Path path, boolean overwrite) { + this.path = path; + this.overwrite = overwrite; + } + + @Override + public long getPos() throws IOException { + checkOpen(); + return buffer.size(); + } + + @Override + public void write(int b) throws IOException { + checkOpen(); + buffer.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkOpen(); + buffer.write(b, off, len); + } + + @Override + public void flush() throws IOException { + checkOpen(); + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + byte[] data = buffer.toByteArray(); + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath); + WriteOperationHelper writeHelper = + fs.createWriteOperationHelper(fs.getActiveAuditSpan()); + PutObjectOptions options = PutObjectOptions.defaultOptions(); + PutObjectRequest request = + applyOverwriteMode( + writeHelper.createPutObjectRequest( + fs.pathToKey(hadoopPath), data.length, options), + overwrite); + + try (S3ADataBlocks.BlockUploadData uploadData = + new S3ADataBlocks.BlockUploadData(data, () -> !closed)) { + writeHelper.putObject( + request, + options, + uploadData, + StubDurationTrackerFactory.STUB_DURATION_TRACKER_FACTORY); + closed = true; + } catch (IOException e) { + if (!overwrite && isPreconditionFailed(e)) { + throw fileAlreadyExists(path, e); + } + throw e; + } + } + + private void checkOpen() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + } + } + private static class CacheKey { private final Options options; diff --git a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3FileIOTest.java b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3FileIOTest.java new file mode 100644 index 000000000000..ba1efbaab4be --- /dev/null +++ b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3FileIOTest.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.s3; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.s3a.S3ADataBlocks; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.s3a.impl.StoreContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.store.audit.AuditSpan; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileAlreadyExistsException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link S3FileIO}. */ +public class S3FileIOTest { + + private static final AuditSpanS3A TEST_AUDIT_SPAN = new TestingAuditSpan(); + + @Test + public void testOverwriteOutputStreamBypassesHadoopCreate() throws Exception { + TestingS3FileIO fileIO = new TestingS3FileIO(); + Path path = new Path("s3://bucket/table/snapshot/snapshot-1"); + + try (PositionOutputStream out = fileIO.newOutputStream(path, true)) { + out.write("snapshot".getBytes(StandardCharsets.UTF_8)); + } + + assertThat(fileIO.directOutputStreamCalls).isEqualTo(1); + assertThat(fileIO.createFileSystemCalls).isEqualTo(0); + assertThat(fileIO.outputStream.content()).isEqualTo("snapshot"); + assertThat(fileIO.outputStream.overwrite).isTrue(); + assertThat(fileIO.outputStream.closed).isTrue(); + } + + @Test + public void testHintOverwriteOutputStreamBypassesHadoopCreate() throws Exception { + TestingS3FileIO fileIO = new TestingS3FileIO(); + Path path = new Path("s3://bucket/table/snapshot/LATEST"); + + try (PositionOutputStream out = fileIO.newOutputStream(path, true)) { + out.write("1".getBytes(StandardCharsets.UTF_8)); + } + + assertThat(fileIO.directOutputStreamCalls).isEqualTo(1); + assertThat(fileIO.createFileSystemCalls).isEqualTo(0); + assertThat(fileIO.outputStream.content()).isEqualTo("1"); + assertThat(fileIO.outputStream.overwrite).isTrue(); + assertThat(fileIO.outputStream.closed).isTrue(); + } + + @Test + public void testNoOverwriteSnapshotOutputStreamBypassesHadoopCreate() throws Exception { + TestingS3FileIO fileIO = new TestingS3FileIO(); + Path path = new Path("s3://bucket/table/snapshot/snapshot-1"); + + assertThat(fileIO.supportsAtomicCreateWithoutOverwrite(path)).isTrue(); + + try (PositionOutputStream out = fileIO.newOutputStream(path, false)) { + out.write("snapshot".getBytes(StandardCharsets.UTF_8)); + } + + assertThat(fileIO.directOutputStreamCalls).isEqualTo(1); + assertThat(fileIO.createFileSystemCalls).isEqualTo(0); + assertThat(fileIO.outputStream.content()).isEqualTo("snapshot"); + assertThat(fileIO.outputStream.overwrite).isFalse(); + assertThat(fileIO.outputStream.closed).isTrue(); + } + + @Test + public void testNoOverwriteSnapshotOutputStreamPreservesConflictSignal() throws Exception { + TestingS3FileIO fileIO = new TestingS3FileIO(); + fileIO.failDirectOutputStreamClose = + new FileAlreadyExistsException("s3://bucket/table/snapshot/snapshot-1"); + Path path = new Path("s3://bucket/table/snapshot/snapshot-1"); + + PositionOutputStream out = fileIO.newOutputStream(path, false); + out.write("snapshot".getBytes(StandardCharsets.UTF_8)); + + assertThatThrownBy(out::close) + .isInstanceOf(FileAlreadyExistsException.class) + .hasMessageContaining("snapshot-1"); + assertThat(fileIO.directOutputStreamCalls).isEqualTo(1); + assertThat(fileIO.createFileSystemCalls).isEqualTo(0); + assertThat(fileIO.outputStream.overwrite).isFalse(); + } + + @Test + public void testNoOverwriteDirectPutPreconditionFailureMapsToFileAlreadyExists() + throws Exception { + for (int statusCode : new int[] {409, 412}) { + IOException failure = s3Failure(statusCode); + DirectPutTestingS3FileIO fileIO = new DirectPutTestingS3FileIO(failure); + Path path = new Path("s3://bucket/table/snapshot/snapshot-1"); + + PositionOutputStream out = fileIO.newOutputStream(path, false); + out.write("snapshot".getBytes(StandardCharsets.UTF_8)); + + assertThatThrownBy(out::close) + .isInstanceOf(FileAlreadyExistsException.class) + .hasMessageContaining("snapshot-1") + .hasCause(failure); + assertThat(fileIO.createFileSystemCalls).isEqualTo(1); + assertThat(fileIO.fileSystem.writeHelper.request.ifNoneMatch()).isEqualTo("*"); + } + } + + @Test + public void testOverwriteDirectPutPreconditionFailureKeepsOriginalIOException() + throws Exception { + for (int statusCode : new int[] {409, 412}) { + IOException failure = s3Failure(statusCode); + DirectPutTestingS3FileIO fileIO = new DirectPutTestingS3FileIO(failure); + Path path = new Path("s3://bucket/table/snapshot/snapshot-1"); + + PositionOutputStream out = fileIO.newOutputStream(path, true); + out.write("snapshot".getBytes(StandardCharsets.UTF_8)); + + assertThatThrownBy(out::close) + .isSameAs(failure) + .isNotInstanceOf(FileAlreadyExistsException.class); + assertThat(fileIO.createFileSystemCalls).isEqualTo(1); + assertThat(fileIO.fileSystem.writeHelper.request.ifNoneMatch()).isNull(); + } + } + + @Test + public void testNoOverwritePutObjectRequestUsesIfNoneMatchHeader() { + PutObjectRequest request = + PutObjectRequest.builder() + .bucket("bucket") + .key("table/snapshot/snapshot-1") + .build(); + + assertThat(S3FileIO.applyOverwriteMode(request, false).ifNoneMatch()).isEqualTo("*"); + assertThat(S3FileIO.applyOverwriteMode(request, true).ifNoneMatch()).isNull(); + } + + @Test + public void testPreconditionFailedIsFileExistsSignal() { + IOException exception = + new IOException( + S3Exception.builder() + .statusCode(412) + .message("Precondition Failed") + .build()); + + assertThat(S3FileIO.isPreconditionFailed(exception)).isTrue(); + } + + @Test + public void testConditionalRequestConflictIsFileExistsSignal() { + IOException exception = + new IOException( + S3Exception.builder() + .statusCode(409) + .message("ConditionalRequestConflict") + .build()); + + assertThat(S3FileIO.isPreconditionFailed(exception)).isTrue(); + } + + @Test + public void testNonSnapshotOverwriteOutputStreamKeepsHadoopCreatePath() { + TestingS3FileIO fileIO = new TestingS3FileIO(); + Path path = new Path("s3://bucket/table/data/file-1"); + + assertThat(fileIO.supportsAtomicCreateWithoutOverwrite(path)).isFalse(); + + assertThatThrownBy(() -> fileIO.newOutputStream(path, true)) + .isInstanceOf(UncheckedIOException.class) + .hasCauseInstanceOf(IOException.class) + .hasRootCauseMessage("Hadoop create called"); + + assertThat(fileIO.directOutputStreamCalls).isEqualTo(0); + assertThat(fileIO.createFileSystemCalls).isEqualTo(1); + } + + @Test + public void testSnapshotMetadataPathClassification() throws Exception { + S3FileIO fileIO = new S3FileIO(); + + assertThat( + fileIO.supportsAtomicCreateWithoutOverwrite( + new Path("s3://bucket/table/snapshot/EARLIEST"))) + .isTrue(); + assertThat( + fileIO.supportsAtomicCreateWithoutOverwrite( + new Path("s3://bucket/table/snapshot/not-snapshot"))) + .isFalse(); + assertThat( + fileIO.supportsAtomicCreateWithoutOverwrite( + new Path("s3://bucket/table/metadata/snapshot-1"))) + .isFalse(); + } + + private static IOException s3Failure(int statusCode) { + return new IOException( + S3Exception.builder().statusCode(statusCode).message("S3 failure").build()); + } + + private static class TestingS3FileIO extends S3FileIO { + + private int directOutputStreamCalls; + private int createFileSystemCalls; + private CapturingPositionOutputStream outputStream; + private IOException failDirectOutputStreamClose; + + @Override + protected PositionOutputStream newDirectOutputStream(Path path, boolean overwrite) { + directOutputStreamCalls++; + outputStream = new CapturingPositionOutputStream(overwrite); + outputStream.failOnClose = failDirectOutputStreamClose; + return outputStream; + } + + @Override + protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) { + createFileSystemCalls++; + throw new UncheckedIOException(new IOException("Hadoop create called")); + } + } + + private static class DirectPutTestingS3FileIO extends S3FileIO { + + private final FailingS3AFileSystem fileSystem; + private int createFileSystemCalls; + + private DirectPutTestingS3FileIO(IOException failure) { + this.fileSystem = new FailingS3AFileSystem(failure); + } + + @Override + protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) { + createFileSystemCalls++; + return fileSystem; + } + } + + private static class FailingS3AFileSystem extends S3AFileSystem { + + private final FailingWriteOperationHelper writeHelper; + + private FailingS3AFileSystem(IOException failure) { + this.writeHelper = new FailingWriteOperationHelper(this, failure); + } + + @Override + public AuditSpanS3A getActiveAuditSpan() { + return TEST_AUDIT_SPAN; + } + + @Override + public WriteOperationHelper createWriteOperationHelper(AuditSpan auditSpan) { + return writeHelper; + } + + @Override + public String pathToKey(org.apache.hadoop.fs.Path path) { + String uriPath = path.toUri().getPath(); + return uriPath.startsWith("/") ? uriPath.substring(1) : uriPath; + } + + @Override + public String getBucket() { + return "bucket"; + } + + @Override + public StoreContext createStoreContext() { + return null; + } + + @Override + public RequestFactory getRequestFactory() { + return null; + } + } + + private static class FailingWriteOperationHelper extends WriteOperationHelper { + + private final IOException failure; + private PutObjectRequest request; + + private FailingWriteOperationHelper(S3AFileSystem fileSystem, IOException failure) { + super(fileSystem, new Configuration(), null, null, TEST_AUDIT_SPAN, null); + this.failure = failure; + } + + @Override + public PutObjectRequest createPutObjectRequest( + String key, long length, PutObjectOptions options) { + return PutObjectRequest.builder() + .bucket("bucket") + .key(key) + .contentLength(length) + .build(); + } + + @Override + public PutObjectResponse putObject( + PutObjectRequest request, + PutObjectOptions options, + S3ADataBlocks.BlockUploadData uploadData, + DurationTrackerFactory durationTrackerFactory) + throws IOException { + this.request = request; + throw failure; + } + } + + private static class TestingAuditSpan implements AuditSpanS3A { + + @Override + public String getSpanId() { + return "test-span"; + } + + @Override + public String getOperationName() { + return "test"; + } + + @Override + public long getTimestamp() { + return 0L; + } + + @Override + public AuditSpan activate() { + return this; + } + + @Override + public void deactivate() {} + } + + private static class CapturingPositionOutputStream extends PositionOutputStream { + + private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + private final boolean overwrite; + private IOException failOnClose; + private boolean closed; + + private CapturingPositionOutputStream(boolean overwrite) { + this.overwrite = overwrite; + } + + @Override + public long getPos() { + return buffer.size(); + } + + @Override + public void write(int b) throws IOException { + checkOpen(); + buffer.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkOpen(); + buffer.write(b, off, len); + } + + @Override + public void flush() throws IOException { + checkOpen(); + } + + @Override + public void close() throws IOException { + if (failOnClose != null) { + throw failOnClose; + } + closed = true; + } + + private String content() { + return new String(buffer.toByteArray(), StandardCharsets.UTF_8); + } + + private void checkOpen() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + } + } +}