-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support upper bound in dynamic bucket mode #4974
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,15 +20,20 @@ | |
|
||
import org.apache.paimon.data.BinaryRow; | ||
import org.apache.paimon.manifest.IndexManifestEntry; | ||
import org.apache.paimon.table.sink.KeyAndBucketExtractor; | ||
import org.apache.paimon.utils.Int2ShortHashMap; | ||
import org.apache.paimon.utils.IntIterator; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.EOFException; | ||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.LinkedHashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
@@ -38,6 +43,7 @@ | |
|
||
/** Bucket Index Per Partition. */ | ||
public class PartitionIndex { | ||
private static final Logger LOG = LoggerFactory.getLogger(PartitionIndex.class); | ||
|
||
public final Int2ShortHashMap hash2Bucket; | ||
|
||
|
@@ -57,13 +63,13 @@ public PartitionIndex( | |
long targetBucketRowNumber) { | ||
this.hash2Bucket = hash2Bucket; | ||
this.nonFullBucketInformation = bucketInformation; | ||
this.totalBucket = new HashSet<>(bucketInformation.keySet()); | ||
this.totalBucket = new LinkedHashSet<>(bucketInformation.keySet()); | ||
this.targetBucketRowNumber = targetBucketRowNumber; | ||
this.lastAccessedCommitIdentifier = Long.MIN_VALUE; | ||
this.accessed = true; | ||
} | ||
|
||
public int assign(int hash, IntPredicate bucketFilter) { | ||
public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) { | ||
accessed = true; | ||
|
||
// 1. is it a key that has appeared before | ||
|
@@ -80,29 +86,35 @@ public int assign(int hash, IntPredicate bucketFilter) { | |
Long number = entry.getValue(); | ||
if (number < targetBucketRowNumber) { | ||
entry.setValue(number + 1); | ||
hash2Bucket.put(hash, bucket.shortValue()); | ||
return bucket; | ||
return cacheBucketAndGet(hash2Bucket, hash, bucket); | ||
} else { | ||
iterator.remove(); | ||
} | ||
} | ||
|
||
// 3. create a new bucket | ||
for (int i = 0; i < Short.MAX_VALUE; i++) { | ||
if (bucketFilter.test(i) && !totalBucket.contains(i)) { | ||
hash2Bucket.put(hash, (short) i); | ||
nonFullBucketInformation.put(i, 1L); | ||
totalBucket.add(i); | ||
return i; | ||
if (-1 == maxBucketsNum || totalBucket.size() < maxBucketsNum) { | ||
// 3. create a new bucket | ||
for (int i = 0; i < Short.MAX_VALUE; i++) { | ||
if (bucketFilter.test(i) && !totalBucket.contains(i)) { | ||
nonFullBucketInformation.put(i, 1L); | ||
totalBucket.add(i); | ||
return cacheBucketAndGet(hash2Bucket, hash, i); | ||
} | ||
} | ||
} | ||
|
||
@SuppressWarnings("OptionalGetWithoutIsPresent") | ||
int maxBucket = totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt(); | ||
throw new RuntimeException( | ||
String.format( | ||
"Too more bucket %s, you should increase target bucket row number %s.", | ||
maxBucket, targetBucketRowNumber)); | ||
@SuppressWarnings("OptionalGetWithoutIsPresent") | ||
int maxBucket = totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt(); | ||
throw new RuntimeException( | ||
String.format( | ||
"Too more bucket %s, you should increase target bucket row number %s.", | ||
maxBucket, targetBucketRowNumber)); | ||
} else { | ||
// exceed buckets upper bound | ||
return cacheBucketAndGet( | ||
hash2Bucket, | ||
hash, | ||
KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, maxBucketsNum)); | ||
} | ||
} | ||
|
||
public static PartitionIndex loadIndex( | ||
|
@@ -137,4 +149,47 @@ public static PartitionIndex loadIndex( | |
} | ||
return new PartitionIndex(mapBuilder.build(), buckets, targetBucketRowNumber); | ||
} | ||
|
||
public static int cacheBucketAndGet(Int2ShortHashMap hash2Bucket, int hash, int bucket) { | ||
liyubin117 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
hash2Bucket.put(hash, (short) bucket); | ||
return bucket; | ||
} | ||
|
||
public static int[] getMaxBucketsPerAssigner(int maxBuckets, int assigners) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cannot understand There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question, After offline discussion, we have reached a consesus, done :) |
||
int[] maxBucketsArr = new int[assigners]; | ||
if (-1 == maxBuckets) { | ||
Arrays.fill(maxBucketsArr, -1); | ||
return maxBucketsArr; | ||
} | ||
if (0 >= maxBuckets) { | ||
throw new IllegalArgumentException( | ||
"Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound)."); | ||
} | ||
int avg = maxBuckets / assigners; | ||
int remainder = maxBuckets % assigners; | ||
for (int i = 0; i < assigners; i++) { | ||
maxBucketsArr[i] = avg; | ||
if (remainder > 0) { | ||
maxBucketsArr[i]++; | ||
remainder--; | ||
} | ||
} | ||
LOG.info( | ||
"After distributing max-buckets '{}' to '{}' assigners evenly, maxBuckets layout: '{}'.", | ||
maxBuckets, | ||
assigners, | ||
Arrays.toString(maxBucketsArr)); | ||
return maxBucketsArr; | ||
} | ||
|
||
public static int getSpecifiedMaxBuckets(int[] maxBucketsArr, int assignerId) { | ||
int length = maxBucketsArr.length; | ||
if (length == 0) { | ||
throw new IllegalStateException("maxBuckets layout should exists!"); | ||
} else if (assignerId < length) { | ||
return maxBucketsArr[assignerId]; | ||
} else { | ||
return -1 == maxBucketsArr[0] ? -1 : 0; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here should be: max bucket should be smaller than maxBucketsNum.
If it is a job restarted from scratch, each task is increasing, and the previous judgment may be problematic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wise consideration! done :)