From 1396cffbc41a98f547b40d878089eaffc0c4a772 Mon Sep 17 00:00:00 2001 From: Yubin Li Date: Tue, 11 Feb 2025 15:04:33 +0800 Subject: [PATCH] refactor to simplify --- .../apache/paimon/index/PartitionIndex.java | 77 ++++++------------- .../index/SimpleHashBucketAssigner.java | 29 +++++-- .../paimon/index/HashBucketAssignerTest.java | 46 ++--------- .../index/SimpleHashBucketAssignerTest.java | 15 +--- .../sink/HashBucketAssignerOperator.java | 17 +--- .../spark/commands/BucketProcessor.scala | 5 +- .../spark/commands/PaimonSparkWriter.scala | 5 +- 7 files changed, 60 insertions(+), 134 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java index e9047c1eec65..583c1cc63bda 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java @@ -30,7 +30,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashSet; @@ -101,24 +100,32 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) { // 3. create a new bucket for (int i = 0; i < Short.MAX_VALUE; i++) { if (bucketFilter.test(i) && !totalBucket.contains(i)) { - nonFullBucketInformation.put(i, 1L); - totalBucket.add(i); - hash2Bucket.put(hash, (short) i); - return i; + // The new bucketId may still be larger than the upper bound + if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) { + nonFullBucketInformation.put(i, 1L); + totalBucket.add(i); + hash2Bucket.put(hash, (short) i); + return i; + } else { + // No need to enter the next iteration because the upper bound has been + // exceeded + break; + } } } - - throw new RuntimeException( - String.format( - "Too more bucket %s, you should increase target bucket row number %s.", - maxBucketId, targetBucketRowNumber)); - } else { - // exceed buckets upper bound - int bucket = - KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, maxBucketsNum); - hash2Bucket.put(hash, (short) bucket); - return bucket; + if (-1 == maxBucketsNum) { + throw new RuntimeException( + String.format( + "Too more bucket %s, you should increase target bucket row number %s.", + maxBucketId, targetBucketRowNumber)); + } } + + // exceed buckets upper bound + int bucket = + KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, totalBucket.size()); + hash2Bucket.put(hash, (short) bucket); + return bucket; } public static PartitionIndex loadIndex( @@ -153,42 +160,4 @@ public static PartitionIndex loadIndex( } return new PartitionIndex(mapBuilder.build(), buckets, targetBucketRowNumber); } - - public static int[] getMaxBucketsPerAssigner(int maxBuckets, int assigners) { - int[] maxBucketsArr = new int[assigners]; - if (-1 == maxBuckets) { - Arrays.fill(maxBucketsArr, -1); - return maxBucketsArr; - } - if (0 >= maxBuckets) { - throw new IllegalArgumentException( - "Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound)."); - } - int avg = maxBuckets / assigners; - int remainder = maxBuckets % assigners; - for (int i = 0; i < assigners; i++) { - maxBucketsArr[i] = avg; - if (remainder > 0) { - maxBucketsArr[i]++; - remainder--; - } - } - LOG.info( - "After distributing max-buckets '{}' to '{}' assigners evenly, maxBuckets layout: '{}'.", - maxBuckets, - assigners, - Arrays.toString(maxBucketsArr)); - return maxBucketsArr; - } - - public static int getSpecifiedMaxBuckets(int[] maxBucketsArr, int assignerId) { - int length = maxBucketsArr.length; - if (length == 0) { - throw new IllegalStateException("maxBuckets layout should exists!"); - } else if (assignerId < length) { - return maxBucketsArr[assignerId]; - } else { - return -1 == maxBucketsArr[0] ? -1 : 0; - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java index e9160d22454c..d577cdfa6242 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java @@ -87,15 +87,25 @@ public int assign(int hash) { } Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 0L); + if (num >= targetBucketRowNumber) { - if (-1 != maxBucketsNum && bucketInformation.size() >= maxBucketsNum) { + int maxBucketId = + bucketInformation.isEmpty() + ? 0 + : bucketInformation.keySet().stream() + .mapToInt(Integer::intValue) + .max() + .getAsInt(); + if (-1 == maxBucketsNum + || bucketInformation.isEmpty() + || maxBucketId < maxBucketsNum - 1) { + loadNewBucket(); + } else { int bucket = KeyAndBucketExtractor.bucketWithUpperBound( - bucketInformation.keySet(), hash, maxBucketsNum); + bucketInformation.keySet(), hash, bucketInformation.size()); hash2Bucket.put(hash, (short) bucket); return bucket; - } else { - loadNewBucket(); } } bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L : l + 1); @@ -106,8 +116,15 @@ public int assign(int hash) { private void loadNewBucket() { for (int i = 0; i < Short.MAX_VALUE; i++) { if (isMyBucket(i) && !bucketInformation.containsKey(i)) { - currentBucket = i; - return; + // The new bucketId may still be larger than the upper bound + if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) { + currentBucket = i; + return; + } else { + // No need to enter the next iteration because the upper bound has been + // exceeded + return; + } } } throw new RuntimeException( diff --git a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java index 481a89995a91..b9c6a28378db 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java @@ -37,8 +37,6 @@ import java.util.Arrays; import java.util.Collections; -import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner; -import static org.apache.paimon.index.PartitionIndex.getSpecifiedMaxBuckets; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -122,11 +120,11 @@ public void testAssignWithUpperBound() { assertThat(assigner.assign(row(1), 8)).isEqualTo(0); // full - assertThat(assigner.assign(row(1), 10)).isEqualTo(2); - assertThat(assigner.assign(row(1), 12)).isEqualTo(2); - assertThat(assigner.assign(row(1), 14)).isEqualTo(2); - assertThat(assigner.assign(row(1), 16)).isEqualTo(2); - assertThat(assigner.assign(row(1), 18)).isEqualTo(2); + assertThat(assigner.assign(row(1), 10)).isEqualTo(0); + assertThat(assigner.assign(row(1), 12)).isEqualTo(0); + assertThat(assigner.assign(row(1), 14)).isEqualTo(0); + assertThat(assigner.assign(row(1), 16)).isEqualTo(0); + assertThat(assigner.assign(row(1), 18)).isEqualTo(0); // another partition assertThat(assigner.assign(row(2), 12)).isEqualTo(0); @@ -153,40 +151,10 @@ public void testAssignWithUpperBound() { } } - @Test - public void testMultiAssigners() { - int[] maxBucketsArr = getMaxBucketsPerAssigner(4, 2); - Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 2}); - - maxBucketsArr = getMaxBucketsPerAssigner(8, 3); - Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {3, 3, 2}); - - maxBucketsArr = getMaxBucketsPerAssigner(3, 2); - Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1}); - - Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 0)).isEqualTo(2); - Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 1)).isEqualTo(1); - Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 2)).isEqualTo(0); - - maxBucketsArr = getMaxBucketsPerAssigner(-1, 2); - Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {-1, -1}); - - Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 0)).isEqualTo(-1); - Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 1)).isEqualTo(-1); - Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 2)).isEqualTo(-1); - - assertThatThrownBy(() -> getMaxBucketsPerAssigner(-10, 2)) - .hasMessageContaining( - "Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound)."); - } - @Test public void testAssignWithUpperBoundMultiAssigners() { - int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2); - Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1}); - - HashBucketAssigner assigner0 = createAssigner(2, 2, 0, maxBucketsArr[0]); - HashBucketAssigner assigner1 = createAssigner(2, 2, 1, maxBucketsArr[1]); + HashBucketAssigner assigner0 = createAssigner(2, 2, 0, 3); + HashBucketAssigner assigner1 = createAssigner(2, 2, 1, 3); // assigner0: assign assertThat(assigner0.assign(row(1), 0)).isEqualTo(0); diff --git a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java index ae2e198a58fb..2e2e53b7eff0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java @@ -25,7 +25,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; @@ -72,27 +71,19 @@ public void testAssignWithUpperBound() { Assertions.assertThat(bucket).isEqualTo(2); } - for (int i = 0; i < 100; i++) { - int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++); - Assertions.assertThat(bucket).isEqualTo(4); - } - // exceed upper bound for (int i = 0; i < 200; i++) { int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++); - Assertions.assertThat(bucket).isIn(0, 2, 4); + Assertions.assertThat(bucket).isIn(0, 2); } } @Test public void testAssignWithUpperBoundMultiAssigners() { - int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2); - Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1}); - SimpleHashBucketAssigner simpleHashBucketAssigner0 = - new SimpleHashBucketAssigner(2, 0, 100, maxBucketsArr[0]); + new SimpleHashBucketAssigner(2, 0, 100, 3); SimpleHashBucketAssigner simpleHashBucketAssigner1 = - new SimpleHashBucketAssigner(2, 1, 100, maxBucketsArr[1]); + new SimpleHashBucketAssigner(2, 1, 100, 3); BinaryRow binaryRow = BinaryRow.EMPTY_ROW; int hash = 0; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java index a86a292ee608..5839fc98c23e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java @@ -37,9 +37,6 @@ import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner; -import static org.apache.paimon.index.PartitionIndex.getSpecifiedMaxBuckets; - /** Assign bucket for the input record, output record with bucket. */ public class HashBucketAssignerOperator extends AbstractStreamOperator> implements OneInputStreamOperator> { @@ -85,20 +82,10 @@ public void initializeState(StateInitializationContext context) throws Exception int taskId = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()); long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum(); Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets(); - if (maxBucketsArr == null) { - this.maxBucketsArr = - overwrite - ? getMaxBucketsPerAssigner(maxBucketsNum, numberTasks) - : getMaxBucketsPerAssigner( - maxBucketsNum, MathUtils.min(numAssigners, numberTasks)); - } this.assigner = overwrite ? new SimpleHashBucketAssigner( - numberTasks, - taskId, - targetRowNum, - getSpecifiedMaxBuckets(maxBucketsArr, taskId)) + numberTasks, taskId, targetRowNum, maxBucketsNum) : new HashBucketAssigner( table.snapshotManager(), commitUser, @@ -107,7 +94,7 @@ public void initializeState(StateInitializationContext context) throws Exception MathUtils.min(numAssigners, numberTasks), taskId, targetRowNum, - getSpecifiedMaxBuckets(maxBucketsArr, taskId)); + maxBucketsNum); this.extractor = extractorFunction.apply(table.schema()); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala index 03420ca741bc..19494fc88dfc 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala @@ -101,9 +101,6 @@ case class DynamicBucketProcessor( private val targetBucketRowNumber = fileStoreTable.coreOptions.dynamicBucketTargetRowNum private val rowType = fileStoreTable.rowType private val commitUser = UUID.randomUUID.toString - private val maxBucketsArr = PartitionIndex.getMaxBucketsPerAssigner( - fileStoreTable.coreOptions.dynamicBucketMaxBuckets, - numAssigners) def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = { val rowPartitionKeyExtractor = new RowPartitionKeyExtractor(fileStoreTable.schema) @@ -115,7 +112,7 @@ case class DynamicBucketProcessor( numAssigners, TaskContext.getPartitionId(), targetBucketRowNumber, - PartitionIndex.getSpecifiedMaxBuckets(maxBucketsArr, TaskContext.getPartitionId) + fileStoreTable.coreOptions.dynamicBucketMaxBuckets ) new Iterator[Row]() { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 8baa1f73346f..061337b56faa 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -176,9 +176,6 @@ case class PaimonSparkWriter(table: FileStoreTable) { val numAssigners = Option(table.coreOptions.dynamicBucketInitialBuckets) .map(initialBuckets => Math.min(initialBuckets.toInt, assignerParallelism)) .getOrElse(assignerParallelism) - val maxBucketsArr = PartitionIndex.getMaxBucketsPerAssigner( - table.coreOptions.dynamicBucketMaxBuckets, - numAssigners) def partitionByKey(): DataFrame = { repartitionByKeyPartitionHash( @@ -200,7 +197,7 @@ case class PaimonSparkWriter(table: FileStoreTable) { numAssigners, TaskContext.getPartitionId(), table.coreOptions.dynamicBucketTargetRowNum, - PartitionIndex.getSpecifiedMaxBuckets(maxBucketsArr, TaskContext.getPartitionId) + table.coreOptions.dynamicBucketMaxBuckets ) row => { val sparkRow = new SparkRow(rowType, row)