Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix race condition for earliest snapshot #4930

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;

Expand Down Expand Up @@ -105,9 +106,19 @@ public static Changelog fromJson(String json) {
}

public static Changelog fromPath(FileIO fileIO, Path path) {
try {
return tryFromPath(fileIO, path);
} catch (FileNotFoundException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
}

public static Changelog tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
try {
String json = fileIO.readFileUtf8(path);
return Changelog.fromJson(json);
} catch (FileNotFoundException e) {
throw e;
} catch (IOException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,28 @@ public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException {
return snapshot;
}

private @Nullable Snapshot tryGetEarliestSnapshotLaterThanOrEqualTo(
long snapshotId, long stopId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopSnapshotId?

while (snapshotId <= stopId) {
try {
return tryGetSnapshot(snapshotId);
} catch (FileNotFoundException e) {
snapshotId++;
}
}
return null;
}

public Changelog changelog(long snapshotId) {
Path changelogPath = longLivedChangelogPath(snapshotId);
return Changelog.fromPath(fileIO, changelogPath);
}

private Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException {
Path changelogPath = longLivedChangelogPath(snapshotId);
return Changelog.tryFromPath(fileIO, changelogPath);
}

public Changelog longLivedChangelog(long snapshotId) {
return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId));
}
Expand Down Expand Up @@ -209,7 +226,23 @@ public boolean longLivedChangelogExists(long snapshotId) {

public @Nullable Snapshot earliestSnapshot() {
Long snapshotId = earliestSnapshotId();
return snapshotId == null ? null : snapshot(snapshotId);
if (snapshotId == null) {
return null;
}

Long latest = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestSnapshotId

do {
try {
return tryGetSnapshot(snapshotId);
} catch (FileNotFoundException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log here?

snapshotId++;
if (latest == null) {
latest = latestSnapshotId();
}
}
} while (latest != null && snapshotId <= latest);

return null;
}

public @Nullable Long earliestSnapshotId() {
Expand Down Expand Up @@ -268,25 +301,43 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
}
}

private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundException {
if (longLivedChangelogExists(snapshotId)) {
return tryGetChangelog(snapshotId);
} else {
return tryGetSnapshot(snapshotId);
}
}

/**
* Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
* returned if all snapshots are equal to or later than the timestamp mills.
*/
public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
Long earliestSnapshot = earliestSnapshotId();
Long earliestSnapshotId = earliestSnapshotId();
Long earliest;
if (startFromChangelog) {
Long earliestChangelog = earliestLongLivedChangelogId();
earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog;
earliest = earliestChangelog == null ? earliestSnapshotId : earliestChangelog;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

earliestChangelogId

} else {
earliest = earliestSnapshot;
earliest = earliestSnapshotId;
}
Long latest = latestSnapshotId();
if (earliest == null || latest == null) {
return null;
}

if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
Snapshot earliestSnapshot = null;
while (earliest <= latest) {
try {
earliestSnapshot = tryGetChangelogOrSnapshot(earliest);
break;
} catch (FileNotFoundException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log here?

earliest++;
}
}

if (earliestSnapshot == null || earliestSnapshot.timeMillis() >= timestampMills) {
return earliest - 1;
}

Expand All @@ -312,8 +363,9 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
return null;
}

Snapshot earliestSnapShot = snapshot(earliest);
if (earliestSnapShot.timeMillis() > timestampMills) {
Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);

if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) {
return earliestSnapShot;
}
Snapshot finalSnapshot = null;
Expand Down Expand Up @@ -375,12 +427,23 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);

if (earliestSnapShot == null) {
return null;
}

Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
Snapshot snapshot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);
if (snapshot == null) {
continue;
}
earliestWatermark = snapshot.watermark();
if (earliestWatermark != null) {
break;
}
Expand All @@ -391,7 +454,7 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
}

if (earliestWatermark >= watermark) {
return snapshot(earliest);
return tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);
}
Snapshot finalSnapshot = null;

Expand Down Expand Up @@ -434,9 +497,16 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot = tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest);

if (earliestSnapShot == null) {
return null;
}

Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are some risks. Earliest should be re-assigned.

Maybe you should just optimize earliestSnapshot. And use this method in many places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that some earliestSnapshot methods use binary search while others use linear, I extracted and optimized two kinds of paths. Please take a look.

earliest++;
earliestWatermark = snapshot(earliest).watermark();
Expand Down
Loading
Loading