Skip to content

Commit

Permalink
[flink] fix sink parallelism in case of AQE (#4397)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored Oct 29, 2024
1 parent 1779a40 commit 69390bd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.paimon.flink.sink;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/** Get adaptive config from Flink. Only work for Flink 1.17+. */
Expand All @@ -27,4 +29,20 @@ public class AdaptiveParallelism {
public static boolean isEnabled(StreamExecutionEnvironment env) {
return env.getConfiguration().get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED);
}

/**
* Get default max parallelism of AdaptiveBatchScheduler of Flink. See {@link
* org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory#getDefaultMaxParallelism(Configuration,
* ExecutionConfig)}.
*/
public static int getDefaultMaxParallelism(
ReadableConfig configuration, ExecutionConfig executionConfig) {
return configuration
.getOptional(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM)
.orElse(
executionConfig.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT
? BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM
.defaultValue()
: executionConfig.getParallelism());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
Expand Down Expand Up @@ -318,11 +317,11 @@ private void setParallelismIfAdaptiveConflict() {
parallelismSource = "input parallelism";
parallelism = input.getParallelism();
} else {
parallelismSource = DEFAULT_PARALLELISM.key();
parallelismSource = "AdaptiveBatchScheduler's default max parallelism";
parallelism =
input.getExecutionEnvironment()
.getConfiguration()
.get(DEFAULT_PARALLELISM);
AdaptiveParallelism.getDefaultMaxParallelism(
input.getExecutionEnvironment().getConfiguration(),
input.getExecutionConfig());
}
String msg =
String.format(
Expand Down

0 comments on commit 69390bd

Please sign in to comment.