Skip to content

Commit

Permalink
Fix earliestSnapshotId and unify earliestSnapshot method
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Feb 12, 2025
1 parent 741aba2 commit d8e82fe
Showing 1 changed file with 44 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,24 +163,6 @@ public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException {
return snapshot;
}

private @Nullable Snapshot tryGetEarliestSnapshotLaterThanOrEqualTo(
long snapshotId, long stopSnapshotId, boolean includeChangelog) {
FunctionWithException<Long, Snapshot, FileNotFoundException> snapshotFunction =
includeChangelog ? this::tryGetChangelogOrSnapshot : this::tryGetSnapshot;
while (snapshotId <= stopSnapshotId) {
try {
return snapshotFunction.apply(snapshotId);
} catch (FileNotFoundException e) {
LOG.warn(
"The earliest snapshot or changelog was once identified but disappeared. "
+ "It might have been expired by other jobs operating on this table. "
+ "Searching for the second earliest snapshot or changelog instead. ");
snapshotId++;
}
}
return null;
}

public Changelog changelog(long snapshotId) {
Path changelogPath = longLivedChangelogPath(snapshotId);
return Changelog.fromPath(fileIO, changelogPath);
Expand Down Expand Up @@ -231,12 +213,35 @@ public boolean longLivedChangelogExists(long snapshotId) {
}

public @Nullable Snapshot earliestSnapshot() {
return earliestSnapshot(false);
}

private @Nullable Snapshot earliestSnapshot(boolean includeChangelog) {
Long snapshotId = earliestSnapshotId();
if (snapshotId == null) {
return null;
}

return tryGetEarliestSnapshotLaterThanOrEqualTo(snapshotId, snapshotId + 1, false);
FunctionWithException<Long, Snapshot, FileNotFoundException> snapshotFunction =
includeChangelog ? this::tryGetChangelogOrSnapshot : this::tryGetSnapshot;

// The loss of the earliest snapshot is an event of small probability, so the retry number
// here need not be too large.
int retry = 0;
do {
try {
return snapshotFunction.apply(snapshotId);
} catch (FileNotFoundException e) {
if (retry++ >= 3) {
throw new RuntimeException(e);
}
LOG.warn(
"The earliest snapshot or changelog was once identified but disappeared. "
+ "It might have been expired by other jobs operating on this table. "
+ "Searching for the second earliest snapshot or changelog instead. ");
snapshotId++;
}
} while (true);
}

public @Nullable Long earliestSnapshotId() {
Expand Down Expand Up @@ -308,26 +313,21 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE
* returned if all snapshots are equal to or later than the timestamp mills.
*/
public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
Long earliestSnapshotId = earliestSnapshotId();
Long earliest;
if (startFromChangelog) {
Long earliestChangelogId = earliestLongLivedChangelogId();
earliest = earliestChangelogId == null ? earliestSnapshotId : earliestChangelogId;
} else {
earliest = earliestSnapshotId;
}
Long latest = latestSnapshotId();
if (earliest == null || latest == null) {
if (latest == null) {
return null;
}

Snapshot earliestSnapshot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, true);
Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog);
if (earliestSnapshot == null) {
return null;
}

if (earliestSnapshot == null || earliestSnapshot.timeMillis() >= timestampMills) {
return earliest - 1;
if (earliestSnapshot.timeMillis() >= timestampMills) {
return earliestSnapshot.id() - 1;
}

long earliest = earliestSnapshot.id();
while (earliest < latest) {
long mid = (earliest + latest + 1) / 2;
if (changelogOrSnapshot(mid).timeMillis() < timestampMills) {
Expand All @@ -344,18 +344,17 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE
* mills. If there is no such a snapshot, returns null.
*/
public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
if (earliest == null || latest == null) {
if (latest == null) {
return null;
}

Snapshot earliestSnapShot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);

Snapshot earliestSnapShot = earliestSnapshot();
if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) {
return earliestSnapShot;
}
long earliest = earliestSnapShot.id();

Snapshot finalSnapshot = null;
while (earliest <= latest) {
long mid = earliest + (latest - earliest) / 2; // Avoid overflow
Expand Down Expand Up @@ -408,31 +407,25 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE
}

public @Nullable Snapshot earlierOrEqualWatermark(long watermark) {
Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
// If latest == Long.MIN_VALUE don't need next binary search for watermark
// which can reduce IO cost with snapshot
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);

Snapshot earliestSnapShot = earliestSnapshot();
if (earliestSnapShot == null) {
return null;
}
long earliest = earliestSnapShot.id();

Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
earliest++;
Snapshot snapshot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);
if (snapshot == null) {
continue;
}
Snapshot snapshot = snapshot(earliest);
earliestWatermark = snapshot.watermark();
if (earliestWatermark != null) {
break;
Expand All @@ -444,7 +437,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE
}

if (earliestWatermark >= watermark) {
return tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);
return snapshot(earliest);
}
Snapshot finalSnapshot = null;

Expand Down Expand Up @@ -480,20 +473,18 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE
}

public @Nullable Snapshot laterOrEqualWatermark(long watermark) {
Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
// If latest == Long.MIN_VALUE don't need next binary search for watermark
// which can reduce IO cost with snapshot
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot =
tryGetEarliestSnapshotLaterThanOrEqualTo(earliest, latest, false);

Snapshot earliestSnapShot = earliestSnapshot();
if (earliestSnapShot == null) {
return null;
}
long earliest = earliestSnapShot.id();

Long earliestWatermark = null;
// find the first snapshot with watermark
Expand Down

0 comments on commit d8e82fe

Please sign in to comment.