Skip to content

Commit

Permalink
fix bug when read changelog's index manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
hongli.wwj committed Feb 11, 2025
1 parent 6facb71 commit 8920493
Showing 1 changed file with 31 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,14 @@ private List<DataSplit> generateSplits(
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) {
List<DataSplit> splits = new ArrayList<>();
// Read deletion indexes at once to reduce file IO
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap =
deletionVectors && snapshot != null
? indexFileHandler.scan(
snapshot, DELETION_VECTORS_INDEX, groupedDataFiles.keySet())
: Collections.emptyMap();
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap = null;
if (!isStreaming) {
deletionIndexFilesMap =
deletionVectors && snapshot != null
? indexFileHandler.scan(
snapshot, DELETION_VECTORS_INDEX, groupedDataFiles.keySet())
: Collections.emptyMap();
}
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
groupedDataFiles.entrySet()) {
BinaryRow partition = entry.getKey();
Expand All @@ -347,18 +350,19 @@ private List<DataSplit> generateSplits(
isStreaming
? splitGenerator.splitForStreaming(bucketFiles)
: splitGenerator.splitForBatch(bucketFiles);
List<IndexFileMeta> deletionIndexFiles =
deletionIndexFilesMap.getOrDefault(
Pair.of(partition, bucket), Collections.emptyList());
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
String bucketPath = pathFactory.bucketPath(partition, bucket).toString();
builder.withDataFiles(dataFiles)
.rawConvertible(splitGroup.rawConvertible)
.withBucketPath(bucketPath);
if (deletionVectors) {
if (deletionVectors && deletionIndexFilesMap != null) {
builder.withDataDeletionFiles(
getDeletionFiles(dataFiles, deletionIndexFiles));
getDeletionFiles(
dataFiles,
deletionIndexFilesMap.getOrDefault(
Pair.of(partition, bucket),
Collections.emptyList())));
}

splits.add(builder.build());
Expand Down Expand Up @@ -419,16 +423,20 @@ private Plan toChangesPlan(
buckets.computeIfAbsent(part, k -> new HashSet<>())
.addAll(bucketMap.keySet()));
// Read deletion indexes at once to reduce file IO
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> beforDeletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
beforeSnapshot, DELETION_VECTORS_INDEX, beforeFiles.keySet())
: Collections.emptyMap();
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
snapshot, DELETION_VECTORS_INDEX, dataFiles.keySet())
: Collections.emptyMap();
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> beforDeletionIndexFilesMap = null;
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> deletionIndexFilesMap = null;
if (!isStreaming) {
beforDeletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
beforeSnapshot, DELETION_VECTORS_INDEX, beforeFiles.keySet())
: Collections.emptyMap();
deletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
snapshot, DELETION_VECTORS_INDEX, dataFiles.keySet())
: Collections.emptyMap();
}

for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
BinaryRow part = entry.getKey();
Expand All @@ -454,7 +462,9 @@ private Plan toChangesPlan(
.withDataFiles(data)
.isStreaming(isStreaming)
.withBucketPath(pathFactory.bucketPath(part, bucket).toString());
if (deletionVectors) {
if (deletionVectors
&& beforDeletionIndexFilesMap != null
&& deletionIndexFilesMap != null) {
builder.withBeforeDeletionFiles(
getDeletionFiles(
before,
Expand Down

0 comments on commit 8920493

Please sign in to comment.