Skip to content

Commit ee111e1

Browse files
committed
[FLINK-31215] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators
1 parent 07a3f73 commit ee111e1

File tree

6 files changed

+76
-20
lines changed

6 files changed

+76
-20
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,18 @@
1414
<td>Duration</td>
1515
<td>Lag threshold which will prevent unnecessary scalings while removing the pending messages responsible for the lag.</td>
1616
</tr>
17+
<tr>
18+
<td><h5>job.autoscaler.bottleneck-propagation.allow-scale-down</h5></td>
19+
<td style="word-wrap: break-word;">false</td>
20+
<td>Boolean</td>
21+
<td>Allow vertices scale down during bottleneck propagation.</td>
22+
</tr>
23+
<tr>
24+
<td><h5>job.autoscaler.bottleneck-propagation.enabled</h5></td>
25+
<td style="word-wrap: break-word;">false</td>
26+
<td>Boolean</td>
27+
<td>Enable backpropagation of processing rate during autoscaling to reduce resources usage.</td>
28+
</tr>
1729
<tr>
1830
<td><h5>job.autoscaler.catch-up.duration</h5></td>
1931
<td style="word-wrap: break-word;">30 min</td>
@@ -116,12 +128,6 @@
116128
<td>Double</td>
117129
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
118130
</tr>
119-
<tr>
120-
<td><h5>job.autoscaler.processing.rate.backpropagation.enabled</h5></td>
121-
<td style="word-wrap: break-word;">false</td>
122-
<td>Boolean</td>
123-
<td>Enable backpropagation of processing rate during autoscaling to reduce resources usage.</td>
124-
</tr>
125131
<tr>
126132
<td><h5>job.autoscaler.quota.cpu</h5></td>
127133
<td style="word-wrap: break-word;">(none)</td>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Map;
4040
import java.util.SortedMap;
4141

42+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED;
4243
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
4344
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
4445
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
@@ -85,7 +86,7 @@ public VertexScalingResult computeScaleTargetParallelism(
8586
conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(vertex.toHexString());
8687
if (excluded) {
8788
LOG.debug(
88-
"Vertex {} is part of `vertex.exclude.ids` config, Check for bottleneck but not scale",
89+
"Vertex {} is part of `vertex.exclude.ids` config. Check for being a bottleneck but not scale",
8990
vertex);
9091
}
9192

@@ -112,8 +113,16 @@ public VertexScalingResult computeScaleTargetParallelism(
112113

113114
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
114115
double scaleFactor = targetCapacity / averageTrueProcessingRate;
115-
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
116116
double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
117+
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
118+
119+
// if bottleneck propagation is applied and scaling down is forbidden, limit minScaleFactor
120+
// with 1
121+
if (backpropagationScaleFactor < 1.0
122+
&& !conf.getBoolean(BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED)) {
123+
minScaleFactor = 1.0;
124+
}
125+
117126
if (scaleFactor < minScaleFactor) {
118127
LOG.debug(
119128
"Computed scale factor of {} for {} is capped by maximum scale down factor to {}",

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@
4848
import java.util.Map;
4949
import java.util.SortedMap;
5050

51+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.BOTTLENECK_PROPAGATION_ENABLED;
5152
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
52-
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED;
5353
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
5454
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5555
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON;
@@ -226,8 +226,7 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
226226
context, evaluatedMetrics, scalingHistory, restartTime, jobTopology, 1.0);
227227

228228
if (scalingResult.getBottlenecks().isEmpty()
229-
|| !context.getConfiguration()
230-
.getBoolean(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) {
229+
|| !context.getConfiguration().getBoolean(BOTTLENECK_PROPAGATION_ENABLED)) {
231230
return scalingResult.getScalingSummaries();
232231
}
233232

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,22 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
5858
.withDescription(
5959
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");
6060

61-
public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
62-
autoScalerConfig("processing.rate.backpropagation.enabled")
61+
public static final ConfigOption<Boolean> BOTTLENECK_PROPAGATION_ENABLED =
62+
autoScalerConfig("bottleneck-propagation.enabled")
6363
.booleanType()
6464
.defaultValue(false)
65-
.withFallbackKeys(
66-
oldOperatorConfigKey("processing.rate.backpropagation.enabled"))
65+
.withFallbackKeys(oldOperatorConfigKey("bottleneck-propagation.enabled"))
6766
.withDescription(
6867
"Enable backpropagation of processing rate during autoscaling to reduce resources usage.");
6968

69+
public static final ConfigOption<Boolean> BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED =
70+
autoScalerConfig("bottleneck-propagation.allow-scale-down")
71+
.booleanType()
72+
.defaultValue(false)
73+
.withFallbackKeys(
74+
oldOperatorConfigKey("bottleneck-propagation.allow-scale-down"))
75+
.withDescription("Allow vertices scale down during bottleneck propagation.");
76+
7077
public static final ConfigOption<Duration> METRICS_WINDOW =
7178
autoScalerConfig("metrics.window")
7279
.durationType()

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,41 @@ public void testBackPropagationScaleFactorAffectsScaling() {
517517
assertFalse(result.isBottleneck());
518518
}
519519

520+
@Test
521+
public void testBottleneckPropagationScaleDown() {
522+
var op = new JobVertexID();
523+
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
524+
conf.set(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 20);
525+
conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED, false);
526+
527+
var result =
528+
vertexScaler.computeScaleTargetParallelism(
529+
context,
530+
op,
531+
NOT_ADJUST_INPUTS,
532+
evaluated(4, 200, 200),
533+
Collections.emptySortedMap(),
534+
restartTime,
535+
0.5);
536+
537+
assertEquals(4, result.getParallelism());
538+
assertFalse(result.isBottleneck());
539+
540+
conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_SCALE_DOWN_ENABLED, true);
541+
result =
542+
vertexScaler.computeScaleTargetParallelism(
543+
context,
544+
op,
545+
NOT_ADJUST_INPUTS,
546+
evaluated(4, 200, 200),
547+
Collections.emptySortedMap(),
548+
restartTime,
549+
0.5);
550+
551+
assertEquals(2, result.getParallelism());
552+
assertFalse(result.isBottleneck());
553+
}
554+
520555
@Test
521556
public void testScaleDownAfterScaleUpDetection() {
522557
var op = new JobVertexID();

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ public void testScalingWithBackPropEnabledSimpleGraph() throws Exception {
661661

662662
var conf = context.getConfiguration();
663663
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
664-
conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, true);
664+
conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_ENABLED, true);
665665

666666
// If back propagation is enabled, then parallelism of all vertices is 5
667667
var metrics =
@@ -698,7 +698,7 @@ public void testScalingWithBackPropEnabledSimpleGraph() throws Exception {
698698
"a6b7102b8d3e3a9564998c1ffeb5e2b7",
699699
"5"));
700700

701-
conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, false);
701+
conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_ENABLED, false);
702702
now = Instant.now();
703703
assertThat(
704704
scalingExecutor.scaleResource(
@@ -722,7 +722,7 @@ public void testScalingWithBackPropEnabledSimpleGraph() throws Exception {
722722
"a6b7102b8d3e3a9564998c1ffeb5e2b7",
723723
"10"));
724724

725-
conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, true);
725+
conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_ENABLED, true);
726726

727727
jobTopology =
728728
new JobTopology(
@@ -804,7 +804,7 @@ public void testScalingWithBackPropEnabledComplexGraph() throws Exception {
804804

805805
var conf = context.getConfiguration();
806806
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
807-
conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, true);
807+
conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_ENABLED, true);
808808

809809
// The expected new parallelism is 7 without adjustment by max parallelism.
810810
var metrics =
@@ -875,7 +875,7 @@ public void testScalingWithBackPropEnabledAndExcludedVerticesSimpleGraph() throw
875875

876876
var conf = context.getConfiguration();
877877
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
878-
conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, true);
878+
conf.set(AutoScalerOptions.BOTTLENECK_PROPAGATION_ENABLED, true);
879879

880880
// If back propagation is enabled, then parallelism of all vertices is 5
881881
var metrics =

0 commit comments

Comments
 (0)