Skip to content

Commit 7a8c242

Browse files
[KERNEL] Read the _last_checkpoint file for latest queries with maxRatifiedVersion (#5529)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Based off of #5443 Review the commits in this PR [here](https://github.com/delta-io/delta/pull/5529/files/d3ac1fbbf6c1b4ba89729375067fddfc822075ca..0783f0d8a99339c3e13c99032d90c3de37ebba96) Adds the logic in log segment construction to correctly use the _last_checkpoint file for latest queries for catalog managed tables. ## How was this patch tested? Adds tests to check we use the _last_checkpoint file. Tests in the previous PR test that we correctly load the right version. ## Does this PR introduce _any_ user-facing changes? No
1 parent 3b32b6e commit 7a8c242

File tree

3 files changed

+262
-39
lines changed

3 files changed

+262
-39
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java

Lines changed: 98 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -200,28 +200,45 @@ private SnapshotImpl createSnapshot(
200200
* </ol>
201201
*/
202202
public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionToLoadOpt) {
203-
return getLogSegmentForVersion(engine, versionToLoadOpt, Collections.emptyList());
203+
return getLogSegmentForVersion(
204+
engine,
205+
versionToLoadOpt,
206+
Collections.emptyList() /* parsedLogDatas */,
207+
Optional.empty() /* maxCatalogVersionOpt */);
204208
}
205209

206210
/**
207211
* [delta-io/delta#4765]: Right now, we only support sorted and contiguous ratified commit log
208212
* data.
213+
*
214+
* @param timeTravelVersionOpt the version to time-travel to for a time-travel query
215+
* @param parsedLogDatas the parsed log data from the catalog
216+
* @param maxCatalogVersionOpt the maximum version ratified by the catalog for catalog managed
217+
* tables. Empty for file-system managed tables.
209218
*/
210219
public LogSegment getLogSegmentForVersion(
211-
Engine engine, Optional<Long> versionToLoadOpt, List<ParsedLogData> parsedLogDatas) {
220+
Engine engine,
221+
Optional<Long> timeTravelVersionOpt,
222+
List<ParsedLogData> parsedLogDatas,
223+
Optional<Long> maxCatalogVersionOpt) {
224+
// This is the actual version we want to load. For "latest" (aka non-time-travel) queries for
225+
// catalogManaged tables we want to load the maxCatalogVersion
226+
final Optional<Long> versionToLoadOpt =
227+
timeTravelVersionOpt.isPresent() ? timeTravelVersionOpt : maxCatalogVersionOpt;
212228
final long versionToLoad = versionToLoadOpt.orElse(Long.MAX_VALUE);
213229
final String versionToLoadStr = versionToLoadOpt.map(String::valueOf).orElse("latest");
214230
logger.info("Loading log segment for version {}", versionToLoadStr);
215231
final long logSegmentBuildingStartTimeMillis = System.currentTimeMillis();
216232

217-
////////////////////////////////////////////////////////////////////////////////////////////////
218-
// Step 1: Find the latest checkpoint version. If $versionToLoadOpt is empty, use the version //
219-
// referenced by the _LAST_CHECKPOINT file. If $versionToLoad is present, search for //
220-
// the previous latest complete checkpoint at or before $versionToLoad. //
221-
////////////////////////////////////////////////////////////////////////////////////////////////
233+
///////////////////////////////////////////////////////////////////////////////////////////
234+
// Step 1: Find the latest checkpoint version. If timeTravelVersionOpt is empty, use the //
235+
// version referenced by the _LAST_CHECKPOINT file. If timeTravelVersionOpt is //
236+
// present, search for the previous latest complete checkpoint at or before the //
237+
// version to load //
238+
///////////////////////////////////////////////////////////////////////////////////////////
222239

223240
final Optional<Long> startCheckpointVersionOpt =
224-
getStartCheckpointVersion(engine, versionToLoadOpt);
241+
getStartCheckpointVersion(engine, timeTravelVersionOpt, maxCatalogVersionOpt);
225242

226243
/////////////////////////////////////////////////////////////////
227244
// Step 2: Determine the actual version to start listing from. //
@@ -593,39 +610,82 @@ private List<ParsedDeltaData> getAllDeltasAfterCheckpointWithCatalogPriority(
593610
}
594611

595612
/**
596-
* Determine the starting checkpoint version that is at or before `versionToLoadOpt`. If no
597-
* `versionToLoadOpt` is provided, will use the checkpoint pointed to by the _last_checkpoint
598-
* file.
613+
* Determine the starting checkpoint version that is at or before the version to load.
614+
*
615+
* <p>Version to load: For time-travel queries, this is the time-travel version. For latest
616+
* queries on catalog maanged tables, this is the max ratified catalog version. For latest queries
617+
* on file-system managed tables, this is the latest available version on the file-system.
618+
*
619+
* <p>For non-time travel queries we will use the checkpoint pointed to by the _last_checkpoint
620+
* file (except for when it is after the maxRatifiedCatalogVersion, in which case we will search
621+
* backwards for a checkpoint).
599622
*/
600-
private Optional<Long> getStartCheckpointVersion(Engine engine, Optional<Long> versionToLoadOpt) {
601-
return versionToLoadOpt
623+
private Optional<Long> getStartCheckpointVersion(
624+
Engine engine, Optional<Long> timeTravelVersionOpt, Optional<Long> maxCatalogVersionOpt) {
625+
// This is a "latest" query, let's try to use the _last_checkpoint file if possible
626+
if (!timeTravelVersionOpt.isPresent()) {
627+
logger.info("Reading the _last_checkpoint file for 'latest' query");
628+
Optional<Long> lastCheckpointFileVersionOpt =
629+
new Checkpointer(logPath).readLastCheckpointFile(engine).map(x -> x.version);
630+
631+
if (!lastCheckpointFileVersionOpt.isPresent()) {
632+
logger.info("No _last_checkpoint file found, default to listing from 0");
633+
return Optional.empty();
634+
}
635+
636+
long lastCheckpointFileVersion = lastCheckpointFileVersionOpt.get();
637+
638+
if (!maxCatalogVersionOpt.isPresent()) {
639+
// If there is no maxCatalogVersion we don't have to do anything special --> just return
640+
return Optional.of(lastCheckpointFileVersion);
641+
} else {
642+
// When there is a maxCatalogVersion we only want to return the version from the
643+
// _last_checkpoint file if it is less than or equal to the maxCatalogVersion. Otherwise,
644+
// we should revert to listing backwards from the version to load.
645+
646+
// This situation is possible due to race conditions. Since fetching the maxCatalogVersion
647+
// from the catalog, it is possible that a concurrent writer has committed, published
648+
// and checkpointed before this listing code is executed. Thus, it is possible that the
649+
// _last_checkpoint file points to a checkpoint later than the maxCatalogVersion.
650+
if (lastCheckpointFileVersion <= maxCatalogVersionOpt.get()) {
651+
return Optional.of(lastCheckpointFileVersion);
652+
}
653+
logger.info(
654+
"Found checkpoint at version {} in _last_checkpoint file but cannot be used because "
655+
+ "maxCatalogVersion = {}.",
656+
lastCheckpointFileVersion,
657+
maxCatalogVersionOpt.get());
658+
}
659+
}
660+
661+
long versionToLoad =
662+
timeTravelVersionOpt.orElseGet(
663+
() ->
664+
maxCatalogVersionOpt.orElseThrow(
665+
() ->
666+
new IllegalStateException(
667+
"Impossible state: If timeTravelToVersionOpt and maxCatalogVersion "
668+
+ "is empty we should always have returned earlier")));
669+
logger.info("Finding last complete checkpoint at or before version {}", versionToLoad);
670+
final long startTimeMillis = System.currentTimeMillis();
671+
return Checkpointer.findLastCompleteCheckpointBefore(engine, logPath, versionToLoad + 1)
672+
.map(checkpointInstance -> checkpointInstance.version)
602673
.map(
603-
versionToLoad -> {
674+
checkpointVersion -> {
675+
checkArgument(
676+
checkpointVersion <= versionToLoad,
677+
"Last complete checkpoint version %s was not <= targetVersion %s",
678+
checkpointVersion,
679+
versionToLoad);
680+
604681
logger.info(
605-
"Finding last complete checkpoint at or before version {}", versionToLoad);
606-
final long startTimeMillis = System.currentTimeMillis();
607-
return Checkpointer.findLastCompleteCheckpointBefore(
608-
engine, logPath, versionToLoad + 1)
609-
.map(checkpointInstance -> checkpointInstance.version)
610-
.map(
611-
checkpointVersion -> {
612-
checkArgument(
613-
checkpointVersion <= versionToLoad,
614-
"Last complete checkpoint version %s was not <= targetVersion %s",
615-
checkpointVersion,
616-
versionToLoad);
617-
618-
logger.info(
619-
"{}: Took {}ms to find last complete checkpoint <= targetVersion {}",
620-
tablePath,
621-
System.currentTimeMillis() - startTimeMillis,
622-
versionToLoad);
623-
624-
return checkpointVersion;
625-
});
626-
})
627-
.orElseGet(
628-
() -> new Checkpointer(logPath).readLastCheckpointFile(engine).map(x -> x.version));
682+
"{}: Took {}ms to find last complete checkpoint <= targetVersion {}",
683+
tablePath,
684+
System.currentTimeMillis() - startTimeMillis,
685+
versionToLoad);
686+
687+
return checkpointVersion;
688+
});
629689
}
630690

631691
private void logDebugFileStatuses(String varName, List<FileStatus> fileStatuses) {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ private Lazy<LogSegment> getLazyLogSegment(
248248
.time(
249249
() ->
250250
new SnapshotManager(tablePath)
251-
.getLogSegmentForVersion(engine, versionToLoad, ctx.logDatas));
251+
.getLogSegmentForVersion(
252+
engine, versionToLoad, ctx.logDatas, ctx.maxCatalogVersion));
252253

253254
snapshotCtx.setResolvedVersion(logSegment.getVersion());
254255
snapshotCtx.setCheckpointVersion(logSegment.getCheckpointVersionOpt());

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,168 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
738738
assert(exMsg.contains("Missing checkpoint at version 1"))
739739
}
740740

741+
/* ------------------- CATALOG MANAGED TABLE TESTS ------------------ */
742+
743+
test("catalog managed: latest query, we load the maxCatalogVersion even if other deltas exist") {
744+
val deltas = deltaFileStatuses(0L to 20)
745+
val checkpoints = singularCheckpointFileStatuses(Seq(10))
746+
747+
val logSegment = snapshotManager.getLogSegmentForVersion(
748+
createMockFSListFromEngine(deltas ++ checkpoints),
749+
Optional.empty(), // timeTravelVersionOpt
750+
Collections.emptyList(), // parsedLogDatas
751+
Optional.of(15L) // maxCatalogVersionOpt
752+
)
753+
754+
checkLogSegment(
755+
logSegment,
756+
expectedVersion = 15,
757+
expectedDeltas = deltaFileStatuses(11L to 15),
758+
expectedCompactions = Seq.empty,
759+
expectedCheckpoints = singularCheckpointFileStatuses(Seq(10)),
760+
expectedCheckpointVersion = Some(10),
761+
expectedLastCommitTimestamp = 150L)
762+
}
763+
764+
test("catalog managed: latest query, _last_checkpoint does not exist") {
765+
val deltas = deltaFileStatuses(0L to 20)
766+
val checkpoints = singularCheckpointFileStatuses(Seq(10))
767+
768+
val logSegment = snapshotManager.getLogSegmentForVersion(
769+
createMockFSListFromEngine(deltas ++ checkpoints),
770+
Optional.empty(), // timeTravelVersionOpt
771+
Collections.emptyList(), // parsedLogDatas
772+
Optional.of(20L) // maxCatalogVersionOpt
773+
)
774+
775+
// Should find checkpoint at version 10 by searching backwards from version 20
776+
checkLogSegment(
777+
logSegment,
778+
expectedVersion = 20,
779+
expectedDeltas = deltaFileStatuses(11L to 20),
780+
expectedCompactions = Seq.empty,
781+
expectedCheckpoints = singularCheckpointFileStatuses(Seq(10)),
782+
expectedCheckpointVersion = Some(10),
783+
expectedLastCommitTimestamp = 200L)
784+
}
785+
786+
test("catalog managed: latest query, when _last_checkpoint exists and " +
787+
"is <= maxCatalogVersion we use it") {
788+
val deltas = deltaFileStatuses(0L to 30)
789+
val checkpoints = singularCheckpointFileStatuses(Seq(10, 20, 25))
790+
val lastCheckpointFileStatus = FileStatus.of(s"$logPath/_last_checkpoint", 2, 2)
791+
val files = deltas ++ checkpoints ++ Seq(lastCheckpointFileStatus)
792+
793+
// Create mocked engine that fails if we try to list before the version stored in
794+
// _last_checkpoint
795+
def listFrom(filePath: String): Seq[FileStatus] = {
796+
if (filePath < FileNames.listingPrefix(logPath, 25)) {
797+
throw new RuntimeException(
798+
s"Listing from before the checkpoint version referenced by _last_checkpoint.")
799+
}
800+
listFromProvider(files)(filePath)
801+
}
802+
val mockedEngine = mockEngine(
803+
jsonHandler = new MockReadLastCheckpointFileJsonHandler(
804+
lastCheckpointFileStatus.getPath,
805+
25
806+
), // _last_checkpoint points to version 25
807+
fileSystemClient = new MockListFromFileSystemClient(listFrom))
808+
809+
// Latest query with catalog managed table (maxCatalogVersion = 30)
810+
val logSegment = snapshotManager.getLogSegmentForVersion(
811+
mockedEngine,
812+
Optional.empty(), // timeTravelVersionOpt
813+
Collections.emptyList(), // parsedLogDatas
814+
Optional.of(30L) // maxCatalogVersionOpt
815+
)
816+
817+
checkLogSegment(
818+
logSegment,
819+
expectedVersion = 30,
820+
expectedDeltas = deltaFileStatuses(26L to 30),
821+
expectedCompactions = Seq.empty,
822+
expectedCheckpoints = singularCheckpointFileStatuses(Seq(25)),
823+
expectedCheckpointVersion = Some(25),
824+
expectedLastCommitTimestamp = 300L)
825+
}
826+
827+
test("catalog managed:" +
828+
"latest query, ignore _last_checkpoint if it's newer than maxCatalogVersion") {
829+
val deltas = deltaFileStatuses(0L to 26)
830+
val checkpoints = singularCheckpointFileStatuses(Seq(10, 20, 25))
831+
val lastCheckpointFileStatus = FileStatus.of(s"$logPath/_last_checkpoint", 2, 2)
832+
val files = deltas ++ checkpoints ++ Seq(lastCheckpointFileStatus)
833+
834+
// Latest query with catalog managed table where maxCatalogVersion < _last_checkpoint version
835+
val logSegment = snapshotManager.getLogSegmentForVersion(
836+
mockEngine(
837+
jsonHandler = new MockReadLastCheckpointFileJsonHandler(
838+
lastCheckpointFileStatus.getPath,
839+
25
840+
), // _last_checkpoint points to version 25
841+
fileSystemClient = new MockListFromFileSystemClient(listFromProvider(files))),
842+
Optional.empty(), // timeTravelVersionOpt
843+
Collections.emptyList(), // parsedLogDatas
844+
Optional.of(24L) // maxCatalogVersionOpt is 24, which is < 25
845+
)
846+
847+
// Should use checkpoint at version 20 not 25, and should load maxCatalogVersion
848+
checkLogSegment(
849+
logSegment,
850+
expectedVersion = 24,
851+
expectedDeltas = deltaFileStatuses(21L to 24),
852+
expectedCompactions = Seq.empty,
853+
expectedCheckpoints = singularCheckpointFileStatuses(Seq(20)),
854+
expectedCheckpointVersion = Some(20),
855+
expectedLastCommitTimestamp = 240L)
856+
}
857+
858+
test("catalog managed: time travel query ignores _last_checkpoint") {
859+
val deltas = deltaFileStatuses(0L to 30)
860+
val checkpoints = singularCheckpointFileStatuses(Seq(10, 20, 25))
861+
val lastCheckpointFileStatus = FileStatus.of(s"$logPath/_last_checkpoint", 2, 2)
862+
val files = deltas ++ checkpoints ++ Seq(lastCheckpointFileStatus)
863+
864+
val jsonHandler = new BaseMockJsonHandler {
865+
override def readJsonFiles(
866+
fileIter: CloseableIterator[FileStatus],
867+
physicalSchema: StructType,
868+
predicate: Optional[Predicate]): CloseableIterator[ColumnarBatch] = {
869+
assert(fileIter.hasNext)
870+
if (fileIter.next.getPath == lastCheckpointFileStatus.getPath) {
871+
throw new RuntimeException(
872+
"We should not be reading the _last_checkpoint file for time-travel queries")
873+
} else {
874+
throw new RuntimeException("We should not be reading JSON files besides " +
875+
"_last_checkpoint during log segment construction")
876+
}
877+
}
878+
}
879+
880+
// Time travel query with catalog managed table
881+
val logSegment = snapshotManager.getLogSegmentForVersion(
882+
mockEngine(
883+
jsonHandler = jsonHandler,
884+
fileSystemClient = new MockListFromFileSystemClient(listFromProvider(files))),
885+
Optional.of(15L), // timeTravelVersionOpt = 15
886+
Collections.emptyList(), // parsedLogDatas
887+
Optional.of(30L) // maxCatalogVersionOpt
888+
)
889+
890+
// Should use checkpoint at version 10 for time travel to version 15
891+
checkLogSegment(
892+
logSegment,
893+
expectedVersion = 15,
894+
expectedDeltas = deltaFileStatuses(11L to 15),
895+
expectedCompactions = Seq.empty,
896+
expectedCheckpoints = singularCheckpointFileStatuses(Seq(10)),
897+
expectedCheckpointVersion = Some(10),
898+
expectedLastCommitTimestamp = 150L)
899+
}
900+
901+
/* ------------------- Compaction tests ------------------ */
902+
741903
test("One compaction") {
742904
testWithCompactionsNoCheckpoint(
743905
deltaVersions = 0L until 5L,

0 commit comments

Comments
 (0)