diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java index f182c7f4ade3..450df5231439 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java @@ -21,6 +21,7 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue; @@ -56,8 +57,7 @@ public class LookupChangelogMergeFunctionWrapper implements MergeFunctionWrapper { - private final LookupMergeFunction mergeFunction; - private final MergeFunction mergeFunction2; + private final MergeFunction mergeFunction; private final Function lookup; private final ChangelogResult reusedResult = new ChangelogResult(); @@ -68,6 +68,10 @@ public class LookupChangelogMergeFunctionWrapper private final @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer; private final Comparator comparator; + private final LinkedList candidates = new LinkedList<>(); + private final InternalRowSerializer keySerializer; + private final InternalRowSerializer valueSerializer; + public LookupChangelogMergeFunctionWrapper( MergeFunctionFactory mergeFunctionFactory, Function lookup, @@ -85,8 +89,10 @@ public LookupChangelogMergeFunctionWrapper( deletionVectorsMaintainer != null, "deletionVectorsMaintainer should not be null, there is a bug."); } - this.mergeFunction = (LookupMergeFunction) mergeFunction; - this.mergeFunction2 = mergeFunctionFactory.create(); + LookupMergeFunction lookupMergeFunction = (LookupMergeFunction) mergeFunction; + this.keySerializer = lookupMergeFunction.getKeySerializer(); + this.valueSerializer = lookupMergeFunction.getValueSerializer(); + this.mergeFunction = mergeFunctionFactory.create(); this.lookup = lookup; this.valueEqualiser = valueEqualiser; this.lookupStrategy = lookupStrategy; @@ -96,18 +102,17 @@ public LookupChangelogMergeFunctionWrapper( @Override public void reset() { - mergeFunction.reset(); + candidates.clear(); } @Override public void add(KeyValue kv) { - mergeFunction.add(kv); + candidates.add(kv.copy(keySerializer, valueSerializer)); } @Override public ChangelogResult getResult() { // 1. Compute the latest high level record and containLevel0 of candidates - LinkedList candidates = mergeFunction.candidates(); Iterator descending = candidates.descendingIterator(); KeyValue highLevel = null; boolean containLevel0 = false; @@ -152,20 +157,20 @@ public ChangelogResult getResult() { } private KeyValue calculateResult(List candidates, @Nullable KeyValue highLevel) { - mergeFunction2.reset(); + mergeFunction.reset(); for (KeyValue candidate : candidates) { if (highLevel != null && comparator.compare(highLevel, candidate) < 0) { - mergeFunction2.add(highLevel); - mergeFunction2.add(candidate); + mergeFunction.add(highLevel); + mergeFunction.add(candidate); highLevel = null; } else { - mergeFunction2.add(candidate); + mergeFunction.add(candidate); } } if (highLevel != null) { - mergeFunction2.add(highLevel); + mergeFunction.add(highLevel); } - return mergeFunction2.getResult(); + return mergeFunction.getResult(); } private void setChangelog(@Nullable KeyValue before, KeyValue after) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java index 71425ef50dc2..a3a6af23cb19 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java @@ -57,6 +57,14 @@ public void add(KeyValue kv) { candidates.add(kv.copy(keySerializer, valueSerializer)); } + public InternalRowSerializer getKeySerializer() { + return keySerializer; + } + + public InternalRowSerializer getValueSerializer() { + return valueSerializer; + } + @Override public KeyValue getResult() { // 1. Find the latest high level record @@ -79,10 +87,6 @@ public KeyValue getResult() { return mergeFunction.getResult(); } - LinkedList candidates() { - return candidates; - } - public static MergeFunctionFactory wrap( MergeFunctionFactory wrapped, RowType keyType, RowType valueType) { if (wrapped.create() instanceof FirstRowMergeFunction) {