Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix race condition for earliest snapshot #4930

Merged
merged 7 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;

Expand Down Expand Up @@ -105,9 +106,19 @@ public static Changelog fromJson(String json) {
}

public static Changelog fromPath(FileIO fileIO, Path path) {
try {
return tryFromPath(fileIO, path);
} catch (FileNotFoundException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
}

public static Changelog tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
try {
String json = fileIO.readFileUtf8(path);
return Changelog.fromJson(json);
} catch (FileNotFoundException e) {
throw e;
} catch (IOException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,34 @@ public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException {
return snapshot;
}

private @Nullable Snapshot tryGetEarliestSnapshotLaterThanOrEqualTo(
long snapshotId, long stopSnapshotId, boolean includeChangelog) {
FunctionWithException<Long, Snapshot, FileNotFoundException> snapshotFunction =
includeChangelog ? this::tryGetChangelogOrSnapshot : this::tryGetSnapshot;
while (snapshotId <= stopSnapshotId) {
try {
return snapshotFunction.apply(snapshotId);
} catch (FileNotFoundException e) {
LOG.warn(
"The earliest snapshot or changelog was once identified but disappeared. "
+ "It might have been expired by other jobs operating on this table. "
+ "Searching for the second earliest snapshot or changelog instead. ");
snapshotId++;
}
}
return null;
}

public Changelog changelog(long snapshotId) {
Path changelogPath = longLivedChangelogPath(snapshotId);
return Changelog.fromPath(fileIO, changelogPath);
}

private Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException {
Path changelogPath = longLivedChangelogPath(snapshotId);
return Changelog.tryFromPath(fileIO, changelogPath);
}

public Changelog longLivedChangelog(long snapshotId) {
return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId));
}
Expand Down Expand Up @@ -209,7 +232,11 @@ public boolean longLivedChangelogExists(long snapshotId) {

public @Nullable Snapshot earliestSnapshot() {
Long snapshotId = earliestSnapshotId();
return snapshotId == null ? null : snapshot(snapshotId);
if (snapshotId == null) {
return null;
}

return tryGetEarliestSnapshotLaterThanOrEqualTo(snapshotId, snapshotId + 1, false);
}

public @Nullable Long earliestSnapshotId() {
Expand Down Expand Up @@ -268,25 +295,36 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
}
}

private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundException {
if (longLivedChangelogExists(snapshotId)) {
return tryGetChangelog(snapshotId);
} else {
return tryGetSnapshot(snapshotId);
}
}

/**
* Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
* returned if all snapshots are equal to or later than the timestamp mills.
*/
public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
Long earliestSnapshot = earliestSnapshotId();
Long earliestSnapshotId = earliestSnapshotId();
Long earliest;
if (startFromChangelog) {
Long earliestChangelog = earliestLongLivedChangelogId();
earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog;
Long earliestChangelogId = earliestLongLivedChangelogId();
earliest = earliestChangelogId == null ? earliestSnapshotId : earliestChangelogId;
} else {
earliest = earliestSnapshot;
earliest = earliestSnapshotId;
}
Long latest = latestSnapshotId();
if (earliest == null || latest == null) {
return null;
}

if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
Snapshot earliestSnapshot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, true);

if (earliestSnapshot == null || earliestSnapshot.timeMillis() >= timestampMills) {
return earliest - 1;
}

Expand All @@ -312,8 +350,10 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
return null;
}

Snapshot earliestSnapShot = snapshot(earliest);
if (earliestSnapShot.timeMillis() > timestampMills) {
Snapshot earliestSnapShot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);

if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) {
return earliestSnapShot;
}
Snapshot finalSnapshot = null;
Expand Down Expand Up @@ -375,12 +415,25 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);

if (earliestSnapShot == null) {
return null;
}

Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
Snapshot snapshot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);
if (snapshot == null) {
continue;
}
earliestWatermark = snapshot.watermark();
if (earliestWatermark != null) {
break;
}
Expand All @@ -391,7 +444,7 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
}

if (earliestWatermark >= watermark) {
return snapshot(earliest);
return tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);
}
Snapshot finalSnapshot = null;

Expand Down Expand Up @@ -434,9 +487,17 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);

if (earliestSnapShot == null) {
return null;
}

Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are some risks. Earliest should be re-assigned.

Maybe you should just optimize earliestSnapshot. And use this method in many places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that some earliestSnapshot methods use binary search while others use linear, I extracted and optimized two kinds of paths. Please take a look.

earliest++;
earliestWatermark = snapshot(earliest).watermark();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -56,8 +60,42 @@ public void testSnapshotPath() {
}
}

@Test
public void testEarlierThanTimeMillis() throws IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEarliestSnapshot(boolean isRaceCondition) throws IOException {
long millis = 1684726826L;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}

assertThat(snapshotManager.earliestSnapshot().id()).isEqualTo(isRaceCondition ? 1 : 0);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEarlierOrEqualWatermark(boolean isRaceCondition) throws IOException {
long millis = 1684726826L;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000, millis + i * 1000);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}

assertThat(snapshotManager.earlierOrEqualWatermark(millis + 999).id())
.isEqualTo(isRaceCondition ? 1 : 0);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEarlierThanTimeMillis(boolean isRaceCondition) throws IOException {
long base = System.currentTimeMillis();
ThreadLocalRandom random = ThreadLocalRandom.current();

Expand All @@ -70,7 +108,7 @@ public void testEarlierThanTimeMillis() throws IOException {

FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
int firstSnapshotId = random.nextInt(1, 100);
for (int i = 0; i < numSnapshots; i++) {
Snapshot snapshot = createSnapshotWithMillis(firstSnapshotId + i, millis.get(i));
Expand All @@ -90,43 +128,70 @@ public void testEarlierThanTimeMillis() throws IOException {
Long actual = snapshotManager.earlierThanTimeMills(time, false);

if (millis.get(numSnapshots - 1) < time) {
assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots - 1);
if (isRaceCondition && millis.size() == 1) {
assertThat(actual).isNull();
} else {
assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots - 1);
}
} else {
for (int i = 0; i < numSnapshots; i++) {
if (millis.get(i) >= time) {
assertThat(actual).isEqualTo(firstSnapshotId + i - 1);
if (isRaceCondition && i == 0) {
// The first snapshot expired during invocation
if (millis.size() == 1) {
assertThat(actual).isNull();
} else {
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId);
}
} else {
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId + i - 1);
}
break;
}
}
}
}
}

@Test
public void testEarlierOrEqualTimeMills() throws IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEarlierOrEqualTimeMills(boolean isRaceCondition) throws IOException {
long millis = 1684726826L;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}

// there is no snapshot smaller than "millis - 1L" return the earliest snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis())
.isEqualTo(millis);

// smaller than the second snapshot return the first snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis())
.isEqualTo(millis);
// equal to the second snapshot return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis())
.isEqualTo(millis + 1000);
// larger than the second snapshot return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis())
.isEqualTo(millis + 1000);
if (isRaceCondition) {
// The earliest snapshot has expired, so always return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis())
.isEqualTo(millis + 1000L);
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis())
.isEqualTo(millis + 1000L);
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis())
.isEqualTo(millis + 1000L);
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis())
.isEqualTo(millis + 1000L);
} else {
// there is no snapshot smaller than "millis - 1L" return the earliest snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis())
.isEqualTo(millis);

// smaller than the second snapshot return the first snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis())
.isEqualTo(millis);

// equal to the second snapshot return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis())
.isEqualTo(millis + 1000);
// larger than the second snapshot return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis())
.isEqualTo(millis + 1000);
}
}

@Test
Expand Down Expand Up @@ -154,12 +219,13 @@ public void testLaterOrEqualTimeMills() throws IOException {
assertThat(snapshotManager.laterOrEqualTimeMills(millis + 10001)).isNull();
}

@Test
public void testlaterOrEqualWatermark() throws IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLaterOrEqualWatermark(boolean isRaceCondition) throws IOException {
long millis = Long.MIN_VALUE;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, Long.MIN_VALUE);
Expand Down Expand Up @@ -410,4 +476,35 @@ public void testCommitChangelogWhenSameChangelogCommitTwice() throws IOException
snapshotManager.commitChangelog(changelog, id);
assertDoesNotThrow(() -> snapshotManager.commitChangelog(changelog, id));
}

/**
* Test {@link SnapshotManager} to mock situations when there is a race condition, that the
* earliest snapshot is deleted by another thread in the middle of the current thread's
* invocation.
*/
private static class TestSnapshotManager extends SnapshotManager {
private final boolean isRaceCondition;

private boolean deleteEarliestSnapshot = false;

public TestSnapshotManager(FileIO fileIO, Path tablePath, boolean isRaceCondition) {
super(fileIO, tablePath);
this.isRaceCondition = isRaceCondition;
}

@Override
public @Nullable Long earliestSnapshotId() {
Long snapshotId = super.earliestSnapshotId();
if (isRaceCondition && snapshotId != null && !deleteEarliestSnapshot) {
Path snapshotPath = snapshotPath(snapshotId);
try {
fileIO().delete(snapshotPath, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
deleteEarliestSnapshot = true;
}
return snapshotId;
}
}
}
Loading