From bc999a7cf9615bf9984bbd26cfd26f54aa773a91 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Thu, 16 Jan 2025 16:10:40 +0800 Subject: [PATCH 1/6] [core] Fix race condition for earliest snapshot --- .../java/org/apache/paimon/Changelog.java | 11 ++ .../apache/paimon/utils/SnapshotManager.java | 92 +++++++++-- .../paimon/utils/SnapshotManagerTest.java | 145 +++++++++++++++--- 3 files changed, 213 insertions(+), 35 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java b/paimon-core/src/main/java/org/apache/paimon/Changelog.java index 8c6295b44c68..79c65ba570c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java +++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Map; @@ -105,9 +106,19 @@ public static Changelog fromJson(String json) { } public static Changelog fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + throw new RuntimeException("Fails to read changelog from path " + path, e); + } + } + + public static Changelog tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { try { String json = fileIO.readFileUtf8(path); return Changelog.fromJson(json); + } catch (FileNotFoundException e) { + throw e; } catch (IOException e) { throw new RuntimeException("Fails to read changelog from path " + path, e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index ae70d7aec5d1..6c8a1951cdb5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -163,11 +163,28 @@ public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException { return snapshot; } + private @Nullable Snapshot tryGetEarliestSnapshotLaterThanOrEqualTo( + long snapshotId, long stopId) { + while (snapshotId <= stopId) { + try { + return tryGetSnapshot(snapshotId); + } catch (FileNotFoundException e) { + snapshotId++; + } + } + return null; + } + public Changelog changelog(long snapshotId) { Path changelogPath = longLivedChangelogPath(snapshotId); return Changelog.fromPath(fileIO, changelogPath); } + private Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException { + Path changelogPath = longLivedChangelogPath(snapshotId); + return Changelog.tryFromPath(fileIO, changelogPath); + } + public Changelog longLivedChangelog(long snapshotId) { return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId)); } @@ -209,7 +226,23 @@ public boolean longLivedChangelogExists(long snapshotId) { public @Nullable Snapshot earliestSnapshot() { Long snapshotId = earliestSnapshotId(); - return snapshotId == null ? null : snapshot(snapshotId); + if (snapshotId == null) { + return null; + } + + Long latest = null; + do { + try { + return tryGetSnapshot(snapshotId); + } catch (FileNotFoundException e) { + snapshotId++; + if (latest == null) { + latest = latestSnapshotId(); + } + } + } while (latest != null && snapshotId <= latest); + + return null; } public @Nullable Long earliestSnapshotId() { @@ -268,25 +301,43 @@ private Snapshot changelogOrSnapshot(long snapshotId) { } } + private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundException { + if (longLivedChangelogExists(snapshotId)) { + return tryGetChangelog(snapshotId); + } else { + return tryGetSnapshot(snapshotId); + } + } + /** * Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be * returned if all snapshots are equal to or later than the timestamp mills. */ public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) { - Long earliestSnapshot = earliestSnapshotId(); + Long earliestSnapshotId = earliestSnapshotId(); Long earliest; if (startFromChangelog) { Long earliestChangelog = earliestLongLivedChangelogId(); - earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog; + earliest = earliestChangelog == null ? earliestSnapshotId : earliestChangelog; } else { - earliest = earliestSnapshot; + earliest = earliestSnapshotId; } Long latest = latestSnapshotId(); if (earliest == null || latest == null) { return null; } - if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) { + Snapshot earliestSnapshot = null; + while (earliest <= latest) { + try { + earliestSnapshot = tryGetChangelogOrSnapshot(earliest); + break; + } catch (FileNotFoundException e) { + earliest++; + } + } + + if (earliestSnapshot == null || earliestSnapshot.timeMillis() >= timestampMills) { return earliest - 1; } @@ -312,8 +363,9 @@ private Snapshot changelogOrSnapshot(long snapshotId) { return null; } - Snapshot earliestSnapShot = snapshot(earliest); - if (earliestSnapShot.timeMillis() > timestampMills) { + Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); + + if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) { return earliestSnapShot; } Snapshot finalSnapshot = null; @@ -375,12 +427,23 @@ private Snapshot changelogOrSnapshot(long snapshotId) { if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { return null; } + + Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); + + if (earliestSnapShot == null) { + return null; + } + Long earliestWatermark = null; // find the first snapshot with watermark - if ((earliestWatermark = snapshot(earliest).watermark()) == null) { + if ((earliestWatermark = earliestSnapShot.watermark()) == null) { while (earliest < latest) { earliest++; - earliestWatermark = snapshot(earliest).watermark(); + Snapshot snapshot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); + if (snapshot == null) { + continue; + } + earliestWatermark = snapshot.watermark(); if (earliestWatermark != null) { break; } @@ -391,7 +454,7 @@ private Snapshot changelogOrSnapshot(long snapshotId) { } if (earliestWatermark >= watermark) { - return snapshot(earliest); + return tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); } Snapshot finalSnapshot = null; @@ -434,9 +497,16 @@ private Snapshot changelogOrSnapshot(long snapshotId) { if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { return null; } + + Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); + + if (earliestSnapShot == null) { + return null; + } + Long earliestWatermark = null; // find the first snapshot with watermark - if ((earliestWatermark = snapshot(earliest).watermark()) == null) { + if ((earliestWatermark = earliestSnapShot.watermark()) == null) { while (earliest < latest) { earliest++; earliestWatermark = snapshot(earliest).watermark(); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index e828a0c90a9d..4c8d8cd075c1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -27,6 +27,10 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; @@ -56,8 +60,42 @@ public void testSnapshotPath() { } } - @Test - public void testEarlierThanTimeMillis() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEarliestSnapshot(boolean isRaceCondition) throws IOException { + long millis = 1684726826L; + FileIO localFileIO = LocalFileIO.create(); + SnapshotManager snapshotManager = + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); + // create 10 snapshots + for (long i = 0; i < 10; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + + assertThat(snapshotManager.earliestSnapshot().id()).isEqualTo(isRaceCondition ? 1 : 0); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEarlierOrEqualWatermark(boolean isRaceCondition) throws IOException { + long millis = 1684726826L; + FileIO localFileIO = LocalFileIO.create(); + SnapshotManager snapshotManager = + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); + // create 10 snapshots + for (long i = 0; i < 10; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000, millis + i * 1000); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + + assertThat(snapshotManager.earlierOrEqualWatermark(millis + 999).id()) + .isEqualTo(isRaceCondition ? 1 : 0); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEarlierThanTimeMillis(boolean isRaceCondition) throws IOException { long base = System.currentTimeMillis(); ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -70,7 +108,7 @@ public void testEarlierThanTimeMillis() throws IOException { FileIO localFileIO = LocalFileIO.create(); SnapshotManager snapshotManager = - new SnapshotManager(localFileIO, new Path(tempDir.toString())); + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); int firstSnapshotId = random.nextInt(1, 100); for (int i = 0; i < numSnapshots; i++) { Snapshot snapshot = createSnapshotWithMillis(firstSnapshotId + i, millis.get(i)); @@ -90,11 +128,24 @@ public void testEarlierThanTimeMillis() throws IOException { Long actual = snapshotManager.earlierThanTimeMills(time, false); if (millis.get(numSnapshots - 1) < time) { - assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots - 1); + if (isRaceCondition && millis.size() == 1) { + assertThat(actual).isNull(); + } else { + assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots - 1); + } } else { for (int i = 0; i < numSnapshots; i++) { if (millis.get(i) >= time) { - assertThat(actual).isEqualTo(firstSnapshotId + i - 1); + if (isRaceCondition && i == 0) { + // The first snapshot expired during invocation + if (millis.size() == 1) { + assertThat(actual).isNull(); + } else { + assertThat(actual).isEqualTo(firstSnapshotId + i); + } + } else { + assertThat(actual).isEqualTo(firstSnapshotId + i - 1); + } break; } } @@ -102,31 +153,45 @@ public void testEarlierThanTimeMillis() throws IOException { } } - @Test - public void testEarlierOrEqualTimeMills() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEarlierOrEqualTimeMills(boolean isRaceCondition) throws IOException { long millis = 1684726826L; FileIO localFileIO = LocalFileIO.create(); SnapshotManager snapshotManager = - new SnapshotManager(localFileIO, new Path(tempDir.toString())); + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); // create 10 snapshots for (long i = 0; i < 10; i++) { Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000); localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); } - // there is no snapshot smaller than "millis - 1L" return the earliest snapshot - assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis()) - .isEqualTo(millis); - - // smaller than the second snapshot return the first snapshot - assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis()) - .isEqualTo(millis); - // equal to the second snapshot return the second snapshot - assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis()) - .isEqualTo(millis + 1000); - // larger than the second snapshot return the second snapshot - assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis()) - .isEqualTo(millis + 1000); + if (isRaceCondition) { + // The earliest snapshot has expired, so always return the second snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis()) + .isEqualTo(millis + 1000L); + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis()) + .isEqualTo(millis + 1000L); + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis()) + .isEqualTo(millis + 1000L); + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis()) + .isEqualTo(millis + 1000L); + } else { + // there is no snapshot smaller than "millis - 1L" return the earliest snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis()) + .isEqualTo(millis); + + // smaller than the second snapshot return the first snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis()) + .isEqualTo(millis); + + // equal to the second snapshot return the second snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis()) + .isEqualTo(millis + 1000); + // larger than the second snapshot return the second snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis()) + .isEqualTo(millis + 1000); + } } @Test @@ -154,12 +219,13 @@ public void testLaterOrEqualTimeMills() throws IOException { assertThat(snapshotManager.laterOrEqualTimeMills(millis + 10001)).isNull(); } - @Test - public void testlaterOrEqualWatermark() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testlaterOrEqualWatermark(boolean isRaceCondition) throws IOException { long millis = Long.MIN_VALUE; FileIO localFileIO = LocalFileIO.create(); SnapshotManager snapshotManager = - new SnapshotManager(localFileIO, new Path(tempDir.toString())); + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); // create 10 snapshots for (long i = 0; i < 10; i++) { Snapshot snapshot = createSnapshotWithMillis(i, millis, Long.MIN_VALUE); @@ -410,4 +476,35 @@ public void testCommitChangelogWhenSameChangelogCommitTwice() throws IOException snapshotManager.commitChangelog(changelog, id); assertDoesNotThrow(() -> snapshotManager.commitChangelog(changelog, id)); } + + /** + * Test {@link SnapshotManager} to mock situations when there is a race condition, that the + * earliest snapshot is deleted by another thread in the middle of the current thread's + * invocation. + */ + private static class TestSnapshotManager extends SnapshotManager { + private final boolean isRaceCondition; + + private boolean deleteEarliestSnapshot = false; + + public TestSnapshotManager(FileIO fileIO, Path tablePath, boolean isRaceCondition) { + super(fileIO, tablePath); + this.isRaceCondition = isRaceCondition; + } + + @Override + public @Nullable Long earliestSnapshotId() { + Long snapshotId = super.earliestSnapshotId(); + if (isRaceCondition && snapshotId != null && !deleteEarliestSnapshot) { + Path snapshotPath = snapshotPath(snapshotId); + try { + fileIO().delete(snapshotPath, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + deleteEarliestSnapshot = true; + } + return snapshotId; + } + } } From 28f6e07aeb07b86b9bad51cf035855990b74e9a4 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Fri, 17 Jan 2025 22:17:36 +0800 Subject: [PATCH 2/6] [core] Improve naming and log --- .../apache/paimon/utils/SnapshotManager.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 6c8a1951cdb5..8dc8a8c0591d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -164,8 +164,8 @@ public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException { } private @Nullable Snapshot tryGetEarliestSnapshotLaterThanOrEqualTo( - long snapshotId, long stopId) { - while (snapshotId <= stopId) { + long snapshotId, long stopSnapshotId) { + while (snapshotId <= stopSnapshotId) { try { return tryGetSnapshot(snapshotId); } catch (FileNotFoundException e) { @@ -230,17 +230,21 @@ public boolean longLivedChangelogExists(long snapshotId) { return null; } - Long latest = null; + Long latestSnapshotId = null; do { try { return tryGetSnapshot(snapshotId); } catch (FileNotFoundException e) { + LOG.warn( + "The earliest snapshot was once identified but disappeared. " + + "It might have been expired by other jobs operating on this table. " + + "Searching for the second earliest snapshot instead. "); snapshotId++; - if (latest == null) { - latest = latestSnapshotId(); + if (latestSnapshotId == null) { + latestSnapshotId = latestSnapshotId(); } } - } while (latest != null && snapshotId <= latest); + } while (latestSnapshotId != null && snapshotId <= latestSnapshotId); return null; } @@ -317,8 +321,8 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE Long earliestSnapshotId = earliestSnapshotId(); Long earliest; if (startFromChangelog) { - Long earliestChangelog = earliestLongLivedChangelogId(); - earliest = earliestChangelog == null ? earliestSnapshotId : earliestChangelog; + Long earliestChangelogId = earliestLongLivedChangelogId(); + earliest = earliestChangelogId == null ? earliestSnapshotId : earliestChangelogId; } else { earliest = earliestSnapshotId; } @@ -333,6 +337,10 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE earliestSnapshot = tryGetChangelogOrSnapshot(earliest); break; } catch (FileNotFoundException e) { + LOG.warn( + "The earliest snapshot or changelog was once identified but disappeared. " + + "It might have been expired by other jobs operating on this table. " + + "Searching for the second earliest snapshot or changelog instead. "); earliest++; } } From 9dad0ef389765948a7843c55075095841b9db528 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Fri, 7 Feb 2025 16:45:19 +0800 Subject: [PATCH 3/6] Reuse tryGetEarliestSnapshotLaterThanOrEqualTo --- .../apache/paimon/utils/SnapshotManager.java | 57 +++++++------------ .../paimon/utils/SnapshotManagerTest.java | 4 +- 2 files changed, 22 insertions(+), 39 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 8dc8a8c0591d..91c85e4ac7e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -164,11 +164,17 @@ public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException { } private @Nullable Snapshot tryGetEarliestSnapshotLaterThanOrEqualTo( - long snapshotId, long stopSnapshotId) { + long snapshotId, long stopSnapshotId, boolean includeChangelog) { + FunctionWithException snapshotFunction = + includeChangelog ? this::tryGetChangelogOrSnapshot : this::tryGetSnapshot; while (snapshotId <= stopSnapshotId) { try { - return tryGetSnapshot(snapshotId); + return snapshotFunction.apply(snapshotId); } catch (FileNotFoundException e) { + LOG.warn( + "The earliest snapshot or changelog was once identified but disappeared. " + + "It might have been expired by other jobs operating on this table. " + + "Searching for the second earliest snapshot or changelog instead. "); snapshotId++; } } @@ -230,23 +236,7 @@ public boolean longLivedChangelogExists(long snapshotId) { return null; } - Long latestSnapshotId = null; - do { - try { - return tryGetSnapshot(snapshotId); - } catch (FileNotFoundException e) { - LOG.warn( - "The earliest snapshot was once identified but disappeared. " - + "It might have been expired by other jobs operating on this table. " - + "Searching for the second earliest snapshot instead. "); - snapshotId++; - if (latestSnapshotId == null) { - latestSnapshotId = latestSnapshotId(); - } - } - } while (latestSnapshotId != null && snapshotId <= latestSnapshotId); - - return null; + return tryGetEarliestSnapshotLaterThanOrEqualTo(snapshotId, snapshotId + 1, false); } public @Nullable Long earliestSnapshotId() { @@ -331,19 +321,8 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE return null; } - Snapshot earliestSnapshot = null; - while (earliest <= latest) { - try { - earliestSnapshot = tryGetChangelogOrSnapshot(earliest); - break; - } catch (FileNotFoundException e) { - LOG.warn( - "The earliest snapshot or changelog was once identified but disappeared. " - + "It might have been expired by other jobs operating on this table. " - + "Searching for the second earliest snapshot or changelog instead. "); - earliest++; - } - } + Snapshot earliestSnapshot = + tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, true); if (earliestSnapshot == null || earliestSnapshot.timeMillis() >= timestampMills) { return earliest - 1; @@ -371,7 +350,8 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE return null; } - Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); + Snapshot earliestSnapShot = + tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) { return earliestSnapShot; @@ -436,7 +416,8 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE return null; } - Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); + Snapshot earliestSnapShot = + tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); if (earliestSnapShot == null) { return null; @@ -447,7 +428,8 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE if ((earliestWatermark = earliestSnapShot.watermark()) == null) { while (earliest < latest) { earliest++; - Snapshot snapshot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); + Snapshot snapshot = + tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); if (snapshot == null) { continue; } @@ -462,7 +444,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE } if (earliestWatermark >= watermark) { - return tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); + return tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); } Snapshot finalSnapshot = null; @@ -506,7 +488,8 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE return null; } - Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest); + Snapshot earliestSnapShot = + tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); if (earliestSnapShot == null) { return null; diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 4c8d8cd075c1..6f7b74a7c39c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -141,10 +141,10 @@ public void testEarlierThanTimeMillis(boolean isRaceCondition) throws IOExceptio if (millis.size() == 1) { assertThat(actual).isNull(); } else { - assertThat(actual).isEqualTo(firstSnapshotId + i); + assertThat(actual).isLessThanOrEqualTo(firstSnapshotId); } } else { - assertThat(actual).isEqualTo(firstSnapshotId + i - 1); + assertThat(actual).isLessThanOrEqualTo(firstSnapshotId + i - 1); } break; } From 741aba2ca3ad74e6a63ad5071827d2b9fe20e6ef Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Fri, 7 Feb 2025 16:47:43 +0800 Subject: [PATCH 4/6] Fix uppercase --- .../test/java/org/apache/paimon/utils/SnapshotManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 6f7b74a7c39c..20b455704305 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -221,7 +221,7 @@ public void testLaterOrEqualTimeMills() throws IOException { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void testlaterOrEqualWatermark(boolean isRaceCondition) throws IOException { + public void testLaterOrEqualWatermark(boolean isRaceCondition) throws IOException { long millis = Long.MIN_VALUE; FileIO localFileIO = LocalFileIO.create(); SnapshotManager snapshotManager = From d8e82feb008236660641bb8b2ecfe67610eb319d Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Wed, 12 Feb 2025 17:03:08 +0800 Subject: [PATCH 5/6] Fix earliestSnapshotId and unify earliestSnapshot method --- .../apache/paimon/utils/SnapshotManager.java | 97 +++++++++---------- 1 file changed, 44 insertions(+), 53 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 91c85e4ac7e3..e0158bf7aea3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -163,24 +163,6 @@ public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException { return snapshot; } - private @Nullable Snapshot tryGetEarliestSnapshotLaterThanOrEqualTo( - long snapshotId, long stopSnapshotId, boolean includeChangelog) { - FunctionWithException snapshotFunction = - includeChangelog ? this::tryGetChangelogOrSnapshot : this::tryGetSnapshot; - while (snapshotId <= stopSnapshotId) { - try { - return snapshotFunction.apply(snapshotId); - } catch (FileNotFoundException e) { - LOG.warn( - "The earliest snapshot or changelog was once identified but disappeared. " - + "It might have been expired by other jobs operating on this table. " - + "Searching for the second earliest snapshot or changelog instead. "); - snapshotId++; - } - } - return null; - } - public Changelog changelog(long snapshotId) { Path changelogPath = longLivedChangelogPath(snapshotId); return Changelog.fromPath(fileIO, changelogPath); @@ -231,12 +213,35 @@ public boolean longLivedChangelogExists(long snapshotId) { } public @Nullable Snapshot earliestSnapshot() { + return earliestSnapshot(false); + } + + private @Nullable Snapshot earliestSnapshot(boolean includeChangelog) { Long snapshotId = earliestSnapshotId(); if (snapshotId == null) { return null; } - return tryGetEarliestSnapshotLaterThanOrEqualTo(snapshotId, snapshotId + 1, false); + FunctionWithException snapshotFunction = + includeChangelog ? this::tryGetChangelogOrSnapshot : this::tryGetSnapshot; + + // The loss of the earliest snapshot is an event of small probability, so the retry number + // here need not be too large. + int retry = 0; + do { + try { + return snapshotFunction.apply(snapshotId); + } catch (FileNotFoundException e) { + if (retry++ >= 3) { + throw new RuntimeException(e); + } + LOG.warn( + "The earliest snapshot or changelog was once identified but disappeared. " + + "It might have been expired by other jobs operating on this table. " + + "Searching for the second earliest snapshot or changelog instead. "); + snapshotId++; + } + } while (true); } public @Nullable Long earliestSnapshotId() { @@ -308,26 +313,21 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE * returned if all snapshots are equal to or later than the timestamp mills. */ public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) { - Long earliestSnapshotId = earliestSnapshotId(); - Long earliest; - if (startFromChangelog) { - Long earliestChangelogId = earliestLongLivedChangelogId(); - earliest = earliestChangelogId == null ? earliestSnapshotId : earliestChangelogId; - } else { - earliest = earliestSnapshotId; - } Long latest = latestSnapshotId(); - if (earliest == null || latest == null) { + if (latest == null) { return null; } - Snapshot earliestSnapshot = - tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, true); + Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog); + if (earliestSnapshot == null) { + return null; + } - if (earliestSnapshot == null || earliestSnapshot.timeMillis() >= timestampMills) { - return earliest - 1; + if (earliestSnapshot.timeMillis() >= timestampMills) { + return earliestSnapshot.id() - 1; } + long earliest = earliestSnapshot.id(); while (earliest < latest) { long mid = (earliest + latest + 1) / 2; if (changelogOrSnapshot(mid).timeMillis() < timestampMills) { @@ -344,18 +344,17 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE * mills. If there is no such a snapshot, returns null. */ public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) { - Long earliest = earliestSnapshotId(); Long latest = latestSnapshotId(); - if (earliest == null || latest == null) { + if (latest == null) { return null; } - Snapshot earliestSnapShot = - tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); - + Snapshot earliestSnapShot = earliestSnapshot(); if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) { return earliestSnapShot; } + long earliest = earliestSnapShot.id(); + Snapshot finalSnapshot = null; while (earliest <= latest) { long mid = earliest + (latest - earliest) / 2; // Avoid overflow @@ -408,31 +407,25 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE } public @Nullable Snapshot earlierOrEqualWatermark(long watermark) { - Long earliest = earliestSnapshotId(); Long latest = latestSnapshotId(); // If latest == Long.MIN_VALUE don't need next binary search for watermark // which can reduce IO cost with snapshot - if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { + if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { return null; } - Snapshot earliestSnapShot = - tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); - + Snapshot earliestSnapShot = earliestSnapshot(); if (earliestSnapShot == null) { return null; } + long earliest = earliestSnapShot.id(); Long earliestWatermark = null; // find the first snapshot with watermark if ((earliestWatermark = earliestSnapShot.watermark()) == null) { while (earliest < latest) { earliest++; - Snapshot snapshot = - tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); - if (snapshot == null) { - continue; - } + Snapshot snapshot = snapshot(earliest); earliestWatermark = snapshot.watermark(); if (earliestWatermark != null) { break; @@ -444,7 +437,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE } if (earliestWatermark >= watermark) { - return tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); + return snapshot(earliest); } Snapshot finalSnapshot = null; @@ -480,20 +473,18 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE } public @Nullable Snapshot laterOrEqualWatermark(long watermark) { - Long earliest = earliestSnapshotId(); Long latest = latestSnapshotId(); // If latest == Long.MIN_VALUE don't need next binary search for watermark // which can reduce IO cost with snapshot - if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { + if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { return null; } - Snapshot earliestSnapShot = - tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false); - + Snapshot earliestSnapShot = earliestSnapshot(); if (earliestSnapShot == null) { return null; } + long earliest = earliestSnapShot.id(); Long earliestWatermark = null; // find the first snapshot with watermark From 8efb970f676b76f24eb7233ba8e92f0ff039cc31 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Wed, 12 Feb 2025 17:10:49 +0800 Subject: [PATCH 6/6] remove redundant variable --- .../src/main/java/org/apache/paimon/utils/SnapshotManager.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index e0158bf7aea3..d1300cfa2e23 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -425,8 +425,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE if ((earliestWatermark = earliestSnapShot.watermark()) == null) { while (earliest < latest) { earliest++; - Snapshot snapshot = snapshot(earliest); - earliestWatermark = snapshot.watermark(); + earliestWatermark = snapshot(earliest).watermark(); if (earliestWatermark != null) { break; }