Skip to content

Commit

Permalink
refactor to simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
liyubin117 committed Feb 11, 2025
1 parent abcd024 commit 41f3370
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,9 @@
import org.apache.paimon.utils.Int2ShortHashMap;
import org.apache.paimon.utils.IntIterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand All @@ -43,7 +39,6 @@

/** Bucket Index Per Partition. */
public class PartitionIndex {
private static final Logger LOG = LoggerFactory.getLogger(PartitionIndex.class);

public final Int2ShortHashMap hash2Bucket;

Expand Down Expand Up @@ -101,24 +96,31 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) {
// 3. create a new bucket
for (int i = 0; i < Short.MAX_VALUE; i++) {
if (bucketFilter.test(i) && !totalBucket.contains(i)) {
nonFullBucketInformation.put(i, 1L);
totalBucket.add(i);
hash2Bucket.put(hash, (short) i);
return i;
// The new bucketId may still be larger than the upper bound
if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
nonFullBucketInformation.put(i, 1L);
totalBucket.add(i);
hash2Bucket.put(hash, (short) i);
return i;
} else {
// No need to enter the next iteration when upper bound exceeded
break;
}
}
}

throw new RuntimeException(
String.format(
"Too more bucket %s, you should increase target bucket row number %s.",
maxBucketId, targetBucketRowNumber));
} else {
// exceed buckets upper bound
int bucket =
KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, maxBucketsNum);
hash2Bucket.put(hash, (short) bucket);
return bucket;
if (-1 == maxBucketsNum) {
throw new RuntimeException(
String.format(
"Too more bucket %s, you should increase target bucket row number %s.",
maxBucketId, targetBucketRowNumber));
}
}

// exceed buckets upper bound
int bucket =
KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, totalBucket.size());
hash2Bucket.put(hash, (short) bucket);
return bucket;
}

public static PartitionIndex loadIndex(
Expand Down Expand Up @@ -153,42 +155,4 @@ public static PartitionIndex loadIndex(
}
return new PartitionIndex(mapBuilder.build(), buckets, targetBucketRowNumber);
}

public static int[] getMaxBucketsPerAssigner(int maxBuckets, int assigners) {
int[] maxBucketsArr = new int[assigners];
if (-1 == maxBuckets) {
Arrays.fill(maxBucketsArr, -1);
return maxBucketsArr;
}
if (0 >= maxBuckets) {
throw new IllegalArgumentException(
"Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");
}
int avg = maxBuckets / assigners;
int remainder = maxBuckets % assigners;
for (int i = 0; i < assigners; i++) {
maxBucketsArr[i] = avg;
if (remainder > 0) {
maxBucketsArr[i]++;
remainder--;
}
}
LOG.info(
"After distributing max-buckets '{}' to '{}' assigners evenly, maxBuckets layout: '{}'.",
maxBuckets,
assigners,
Arrays.toString(maxBucketsArr));
return maxBucketsArr;
}

public static int getSpecifiedMaxBuckets(int[] maxBucketsArr, int assignerId) {
int length = maxBucketsArr.length;
if (length == 0) {
throw new IllegalStateException("maxBuckets layout should exists!");
} else if (assignerId < length) {
return maxBucketsArr[assignerId];
} else {
return -1 == maxBucketsArr[0] ? -1 : 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,23 @@ public int assign(int hash) {
}

Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 0L);

if (num >= targetBucketRowNumber) {
if (-1 != maxBucketsNum && bucketInformation.size() >= maxBucketsNum) {
int bucket =
KeyAndBucketExtractor.bucketWithUpperBound(
bucketInformation.keySet(), hash, maxBucketsNum);
hash2Bucket.put(hash, (short) bucket);
return bucket;
} else {
int maxBucketId =
bucketInformation.isEmpty()
? 0
: bucketInformation.keySet().stream()
.mapToInt(Integer::intValue)
.max()
.getAsInt();
if (-1 == maxBucketsNum
|| bucketInformation.isEmpty()
|| maxBucketId < maxBucketsNum - 1) {
loadNewBucket();
} else {
currentBucket =
KeyAndBucketExtractor.bucketWithUpperBound(
bucketInformation.keySet(), hash, bucketInformation.size());
}
}
bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L : l + 1);
Expand All @@ -106,7 +114,12 @@ public int assign(int hash) {
private void loadNewBucket() {
for (int i = 0; i < Short.MAX_VALUE; i++) {
if (isMyBucket(i) && !bucketInformation.containsKey(i)) {
currentBucket = i;
// The new bucketId may still be larger than the upper bound
if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
currentBucket = i;
return;
}
// No need to enter the next iteration when upper bound exceeded
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import java.util.Arrays;
import java.util.Collections;

import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner;
import static org.apache.paimon.index.PartitionIndex.getSpecifiedMaxBuckets;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -122,11 +120,11 @@ public void testAssignWithUpperBound() {
assertThat(assigner.assign(row(1), 8)).isEqualTo(0);

// full
assertThat(assigner.assign(row(1), 10)).isEqualTo(2);
assertThat(assigner.assign(row(1), 12)).isEqualTo(2);
assertThat(assigner.assign(row(1), 14)).isEqualTo(2);
assertThat(assigner.assign(row(1), 16)).isEqualTo(2);
assertThat(assigner.assign(row(1), 18)).isEqualTo(2);
assertThat(assigner.assign(row(1), 10)).isEqualTo(0);
assertThat(assigner.assign(row(1), 12)).isEqualTo(0);
assertThat(assigner.assign(row(1), 14)).isEqualTo(0);
assertThat(assigner.assign(row(1), 16)).isEqualTo(0);
assertThat(assigner.assign(row(1), 18)).isEqualTo(0);

// another partition
assertThat(assigner.assign(row(2), 12)).isEqualTo(0);
Expand All @@ -153,40 +151,10 @@ public void testAssignWithUpperBound() {
}
}

@Test
public void testMultiAssigners() {
int[] maxBucketsArr = getMaxBucketsPerAssigner(4, 2);
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 2});

maxBucketsArr = getMaxBucketsPerAssigner(8, 3);
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {3, 3, 2});

maxBucketsArr = getMaxBucketsPerAssigner(3, 2);
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1});

Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 0)).isEqualTo(2);
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 1)).isEqualTo(1);
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 2)).isEqualTo(0);

maxBucketsArr = getMaxBucketsPerAssigner(-1, 2);
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {-1, -1});

Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 0)).isEqualTo(-1);
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 1)).isEqualTo(-1);
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 2)).isEqualTo(-1);

assertThatThrownBy(() -> getMaxBucketsPerAssigner(-10, 2))
.hasMessageContaining(
"Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");
}

@Test
public void testAssignWithUpperBoundMultiAssigners() {
int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2);
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1});

HashBucketAssigner assigner0 = createAssigner(2, 2, 0, maxBucketsArr[0]);
HashBucketAssigner assigner1 = createAssigner(2, 2, 1, maxBucketsArr[1]);
HashBucketAssigner assigner0 = createAssigner(2, 2, 0, 3);
HashBucketAssigner assigner1 = createAssigner(2, 2, 1, 3);

// assigner0: assign
assertThat(assigner0.assign(row(1), 0)).isEqualTo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -72,27 +71,19 @@ public void testAssignWithUpperBound() {
Assertions.assertThat(bucket).isEqualTo(2);
}

for (int i = 0; i < 100; i++) {
int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isEqualTo(4);
}

// exceed upper bound
for (int i = 0; i < 200; i++) {
int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isIn(0, 2, 4);
Assertions.assertThat(bucket).isIn(0, 2);
}
}

@Test
public void testAssignWithUpperBoundMultiAssigners() {
int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2);
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1});

SimpleHashBucketAssigner simpleHashBucketAssigner0 =
new SimpleHashBucketAssigner(2, 0, 100, maxBucketsArr[0]);
new SimpleHashBucketAssigner(2, 0, 100, 3);
SimpleHashBucketAssigner simpleHashBucketAssigner1 =
new SimpleHashBucketAssigner(2, 1, 100, maxBucketsArr[1]);
new SimpleHashBucketAssigner(2, 1, 100, 3);

BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
int hash = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner;
import static org.apache.paimon.index.PartitionIndex.getSpecifiedMaxBuckets;

/** Assign bucket for the input record, output record with bucket. */
public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2<T, Integer>>
implements OneInputStreamOperator<T, Tuple2<T, Integer>> {
Expand Down Expand Up @@ -85,20 +82,10 @@ public void initializeState(StateInitializationContext context) throws Exception
int taskId = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets();
if (maxBucketsArr == null) {
this.maxBucketsArr =
overwrite
? getMaxBucketsPerAssigner(maxBucketsNum, numberTasks)
: getMaxBucketsPerAssigner(
maxBucketsNum, MathUtils.min(numAssigners, numberTasks));
}
this.assigner =
overwrite
? new SimpleHashBucketAssigner(
numberTasks,
taskId,
targetRowNum,
getSpecifiedMaxBuckets(maxBucketsArr, taskId))
numberTasks, taskId, targetRowNum, maxBucketsNum)
: new HashBucketAssigner(
table.snapshotManager(),
commitUser,
Expand All @@ -107,7 +94,7 @@ public void initializeState(StateInitializationContext context) throws Exception
MathUtils.min(numAssigners, numberTasks),
taskId,
targetRowNum,
getSpecifiedMaxBuckets(maxBucketsArr, taskId));
maxBucketsNum);
this.extractor = extractorFunction.apply(table.schema());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ case class DynamicBucketProcessor(
private val targetBucketRowNumber = fileStoreTable.coreOptions.dynamicBucketTargetRowNum
private val rowType = fileStoreTable.rowType
private val commitUser = UUID.randomUUID.toString
private val maxBucketsArr = PartitionIndex.getMaxBucketsPerAssigner(
fileStoreTable.coreOptions.dynamicBucketMaxBuckets,
numAssigners)

def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = {
val rowPartitionKeyExtractor = new RowPartitionKeyExtractor(fileStoreTable.schema)
Expand All @@ -115,7 +112,7 @@ case class DynamicBucketProcessor(
numAssigners,
TaskContext.getPartitionId(),
targetBucketRowNumber,
PartitionIndex.getSpecifiedMaxBuckets(maxBucketsArr, TaskContext.getPartitionId)
fileStoreTable.coreOptions.dynamicBucketMaxBuckets
)

new Iterator[Row]() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,6 @@ case class PaimonSparkWriter(table: FileStoreTable) {
val numAssigners = Option(table.coreOptions.dynamicBucketInitialBuckets)
.map(initialBuckets => Math.min(initialBuckets.toInt, assignerParallelism))
.getOrElse(assignerParallelism)
val maxBucketsArr = PartitionIndex.getMaxBucketsPerAssigner(
table.coreOptions.dynamicBucketMaxBuckets,
numAssigners)

def partitionByKey(): DataFrame = {
repartitionByKeyPartitionHash(
Expand All @@ -200,7 +197,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
numAssigners,
TaskContext.getPartitionId(),
table.coreOptions.dynamicBucketTargetRowNum,
PartitionIndex.getSpecifiedMaxBuckets(maxBucketsArr, TaskContext.getPartitionId)
table.coreOptions.dynamicBucketMaxBuckets
)
row => {
val sparkRow = new SparkRow(rowType, row)
Expand Down

0 comments on commit 41f3370

Please sign in to comment.