Skip to content

Commit adaa22a

Browse files
author
chenyuzhi
committed
[FLINK-36876] Add proxy for restClient
1 parent 9eb3c38 commit adaa22a

File tree

10 files changed

+310
-11
lines changed

10 files changed

+310
-11
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class FlinkOperatorConfiguration {
5353
Duration progressCheckInterval;
5454
Duration restApiReadyDelay;
5555
Duration flinkClientTimeout;
56+
int flinkClientIOThreads;
5657
String flinkServiceHostOverride;
5758
Set<String> watchedNamespaces;
5859
boolean dynamicNamespacesEnabled;
@@ -97,6 +98,10 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
9798
Duration flinkClientTimeout =
9899
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_TIMEOUT);
99100

101+
int flinkClientIOThreads =
102+
operatorConfig.getInteger(
103+
KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_IO_THREADS);
104+
100105
Duration flinkCancelJobTimeout =
101106
operatorConfig.get(
102107
KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_CANCEL_TIMEOUT);
@@ -201,6 +206,7 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
201206
progressCheckInterval,
202207
restApiReadyDelay,
203208
flinkClientTimeout,
209+
flinkClientIOThreads,
204210
flinkServiceHostOverride,
205211
watchedNamespaces,
206212
dynamicNamespacesEnabled,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ public static String operatorConfigKey(String key) {
115115
.withDescription(
116116
"The timeout for the observer to wait the flink rest client to return.");
117117

118+
@Documentation.Section(SECTION_SYSTEM)
119+
public static final ConfigOption<Integer> OPERATOR_FLINK_CLIENT_IO_THREADS =
120+
operatorConfig("kubernetes.operator.flink.client.io.threads")
121+
.intType()
122+
.defaultValue(60)
123+
.withDescription(
124+
"The maximum number of io threads used by the flink rest client.");
125+
118126
@Documentation.Section(SECTION_SYSTEM)
119127
public static final ConfigOption<Duration> OPERATOR_FLINK_CLIENT_CANCEL_TIMEOUT =
120128
operatorConfig("flink.client.cancel.timeout")

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.flink.api.java.tuple.Tuple2;
2424
import org.apache.flink.autoscaler.utils.JobStatusUtils;
2525
import org.apache.flink.client.program.rest.RestClusterClient;
26+
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
27+
import org.apache.flink.client.program.rest.retry.WaitStrategy;
2628
import org.apache.flink.configuration.CheckpointingOptions;
2729
import org.apache.flink.configuration.Configuration;
2830
import org.apache.flink.configuration.RestOptions;
@@ -54,6 +56,7 @@
5456
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
5557
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
5658
import org.apache.flink.runtime.client.JobStatusMessage;
59+
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServicesFactory;
5760
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
5861
import org.apache.flink.runtime.jobmaster.JobResult;
5962
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
@@ -104,6 +107,7 @@
104107
import org.apache.flink.util.Preconditions;
105108

106109
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
110+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
107111
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
108112

109113
import io.fabric8.kubernetes.api.model.DeletionPropagation;
@@ -124,6 +128,7 @@
124128
import java.io.File;
125129
import java.io.FileOutputStream;
126130
import java.io.IOException;
131+
import java.lang.reflect.Constructor;
127132
import java.net.HttpURLConnection;
128133
import java.net.InetSocketAddress;
129134
import java.net.MalformedURLException;
@@ -169,17 +174,20 @@ public abstract class AbstractFlinkService implements FlinkService {
169174
protected final ExecutorService executorService;
170175
protected final FlinkOperatorConfiguration operatorConfig;
171176
protected final ArtifactManager artifactManager;
177+
private final EventLoopGroup flinkClientSharedEventLoopGroup;
172178
private static final String EMPTY_JAR = createEmptyJar();
173179

174180
public AbstractFlinkService(
175181
KubernetesClient kubernetesClient,
176182
ArtifactManager artifactManager,
177183
ExecutorService executorService,
178-
FlinkOperatorConfiguration operatorConfig) {
184+
FlinkOperatorConfiguration operatorConfig,
185+
EventLoopGroup flinkClientEventLoopGroup) {
179186
this.kubernetesClient = kubernetesClient;
180187
this.artifactManager = artifactManager;
181188
this.executorService = executorService;
182189
this.operatorConfig = operatorConfig;
190+
this.flinkClientSharedEventLoopGroup = flinkClientEventLoopGroup;
183191
}
184192

185193
protected abstract PodList getJmPodList(String namespace, String clusterId);
@@ -836,11 +844,34 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
836844
operatorConfig.getFlinkServiceHostOverride(),
837845
ExternalServiceDecorator.getNamespacedExternalServiceName(
838846
clusterId, namespace));
847+
839848
final String restServerAddress = String.format("http://%s:%s", host, port);
840-
return new RestClusterClient<>(
841-
operatorRestConf,
842-
clusterId,
843-
(c, e) -> new StandaloneClientHAServices(restServerAddress));
849+
RestClient restClient =
850+
new RestClientProxy(
851+
operatorRestConf,
852+
executorService,
853+
host,
854+
port,
855+
flinkClientSharedEventLoopGroup);
856+
857+
Constructor<?> clusterClientConstructor =
858+
RestClusterClient.class.getDeclaredConstructor(
859+
Configuration.class,
860+
RestClient.class,
861+
Object.class,
862+
WaitStrategy.class,
863+
ClientHighAvailabilityServicesFactory.class);
864+
865+
clusterClientConstructor.setAccessible(true);
866+
867+
return (RestClusterClient<String>)
868+
clusterClientConstructor.newInstance(
869+
operatorRestConf,
870+
restClient,
871+
clusterId,
872+
new ExponentialWaitStrategy(10L, 2000L),
873+
(ClientHighAvailabilityServicesFactory)
874+
(c, e) -> new StandaloneClientHAServices(restServerAddress));
844875
}
845876

846877
@VisibleForTesting

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
3737

3838
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
39+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
40+
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
3941

4042
import io.javaoperatorsdk.operator.api.reconciler.Context;
4143
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -60,6 +62,7 @@ public class FlinkResourceContextFactory {
6062

6163
protected final Map<Tuple2<Class<?>, ResourceID>, KubernetesResourceMetricGroup>
6264
resourceMetricGroups = new ConcurrentHashMap<>();
65+
private final EventLoopGroup flinkClientEventLoopGroup;
6366

6467
public FlinkResourceContextFactory(
6568
FlinkConfigManager configManager,
@@ -73,6 +76,10 @@ public FlinkResourceContextFactory(
7376
Executors.newFixedThreadPool(
7477
configManager.getOperatorConfiguration().getReconcilerMaxParallelism(),
7578
new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
79+
this.flinkClientEventLoopGroup =
80+
new NioEventLoopGroup(
81+
configManager.getOperatorConfiguration().getFlinkClientIOThreads(),
82+
new ExecutorThreadFactory("flink-rest-client-netty-shared"));
7683
}
7784

7885
public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
@@ -123,13 +130,15 @@ protected FlinkService getFlinkService(FlinkResourceContext<?> ctx) {
123130
artifactManager,
124131
clientExecutorService,
125132
ctx.getOperatorConfig(),
126-
eventRecorder);
133+
eventRecorder,
134+
flinkClientEventLoopGroup);
127135
case STANDALONE:
128136
return new StandaloneFlinkService(
129137
ctx.getKubernetesClient(),
130138
artifactManager,
131139
clientExecutorService,
132-
ctx.getOperatorConfig());
140+
ctx.getOperatorConfig(),
141+
flinkClientEventLoopGroup);
133142
default:
134143
throw new UnsupportedOperationException(
135144
String.format("Unsupported deployment mode: %s", deploymentMode));

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
5252
import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
5353

54+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
55+
5456
import io.fabric8.kubernetes.api.model.DeletionPropagation;
5557
import io.fabric8.kubernetes.api.model.PodList;
5658
import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -89,7 +91,28 @@ public NativeFlinkService(
8991
ExecutorService executorService,
9092
FlinkOperatorConfiguration operatorConfig,
9193
EventRecorder eventRecorder) {
92-
super(kubernetesClient, artifactManager, executorService, operatorConfig);
94+
this(
95+
kubernetesClient,
96+
artifactManager,
97+
executorService,
98+
operatorConfig,
99+
eventRecorder,
100+
null);
101+
}
102+
103+
public NativeFlinkService(
104+
KubernetesClient kubernetesClient,
105+
ArtifactManager artifactManager,
106+
ExecutorService executorService,
107+
FlinkOperatorConfiguration operatorConfig,
108+
EventRecorder eventRecorder,
109+
EventLoopGroup flinkClientEventLoopGroup) {
110+
super(
111+
kubernetesClient,
112+
artifactManager,
113+
executorService,
114+
operatorConfig,
115+
flinkClientEventLoopGroup);
93116
this.eventRecorder = eventRecorder;
94117
}
95118

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.service;
19+
20+
import org.apache.flink.api.common.time.Time;
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.runtime.rest.RestClient;
23+
import org.apache.flink.util.ConfigurationException;
24+
import org.apache.flink.util.Preconditions;
25+
26+
import org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap;
27+
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
28+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
29+
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.lang.reflect.Field;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.ExecutorService;
36+
37+
/** Proxy for the {@link RestClient}. */
38+
public class RestClientProxy extends RestClient {
39+
private static final Logger LOG = LoggerFactory.getLogger(RestClientProxy.class);
40+
41+
private final boolean useSharedEventLoopGroup;
42+
private Field groupField;
43+
private Bootstrap bootstrap;
44+
private CompletableFuture<Void> terminationFuture;
45+
46+
public RestClientProxy(
47+
Configuration configuration,
48+
ExecutorService executor,
49+
String host,
50+
int port,
51+
EventLoopGroup sharedGroup)
52+
throws ConfigurationException, NoSuchFieldException, IllegalAccessException {
53+
super(configuration, executor, host, port);
54+
55+
if (sharedGroup != null) {
56+
Preconditions.checkArgument(
57+
!sharedGroup.isShuttingDown() && !sharedGroup.isShutdown(),
58+
"provided eventLoopGroup is shut/shutting down");
59+
60+
// get private field
61+
Field bootstrapField = RestClient.class.getDeclaredField("bootstrap");
62+
Field terminationFutureField = RestClient.class.getDeclaredField("terminationFuture");
63+
Field groupField = AbstractBootstrap.class.getDeclaredField("group");
64+
65+
bootstrapField.setAccessible(true);
66+
terminationFutureField.setAccessible(true);
67+
groupField.setAccessible(true);
68+
69+
// TODO check null
70+
this.terminationFuture = (CompletableFuture<Void>) terminationFutureField.get(this);
71+
this.bootstrap = (Bootstrap) bootstrapField.get(this);
72+
this.groupField = groupField;
73+
74+
// close previous group
75+
bootstrap.config().group().shutdown();
76+
// setup share group
77+
groupField.set(bootstrap, sharedGroup);
78+
79+
useSharedEventLoopGroup = true;
80+
} else {
81+
useSharedEventLoopGroup = false;
82+
}
83+
}
84+
85+
@Override
86+
public CompletableFuture<Void> closeAsync() {
87+
if (useSharedEventLoopGroup) {
88+
this.shutdownInternal();
89+
}
90+
91+
return super.closeAsync();
92+
}
93+
94+
@Override
95+
public void shutdown(Time timeout) {
96+
if (useSharedEventLoopGroup) {
97+
this.shutdownInternal();
98+
}
99+
super.shutdown(timeout);
100+
}
101+
102+
private void shutdownInternal() {
103+
try {
104+
// replace bootstrap's group to null to avoid shutdown shared group
105+
groupField.set(bootstrap, null);
106+
terminationFuture.complete(null);
107+
} catch (IllegalAccessException e) {
108+
LOG.error("Failed to setup rest client event group .", e);
109+
}
110+
}
111+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
4040
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
4141

42+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
43+
4244
import io.fabric8.kubernetes.api.model.DeletionPropagation;
4345
import io.fabric8.kubernetes.api.model.PodList;
4446
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -61,7 +63,21 @@ public StandaloneFlinkService(
6163
ArtifactManager artifactManager,
6264
ExecutorService executorService,
6365
FlinkOperatorConfiguration operatorConfig) {
64-
super(kubernetesClient, artifactManager, executorService, operatorConfig);
66+
this(kubernetesClient, artifactManager, executorService, operatorConfig, null);
67+
}
68+
69+
public StandaloneFlinkService(
70+
KubernetesClient kubernetesClient,
71+
ArtifactManager artifactManager,
72+
ExecutorService executorService,
73+
FlinkOperatorConfiguration operatorConfig,
74+
EventLoopGroup flinkClientEventLoopGroup) {
75+
super(
76+
kubernetesClient,
77+
artifactManager,
78+
executorService,
79+
operatorConfig,
80+
flinkClientEventLoopGroup);
6581
}
6682

6783
@Override

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@
6969
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
7070
import org.apache.flink.runtime.rest.util.RestClientException;
7171
import org.apache.flink.util.SerializedThrowable;
72+
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
7273
import org.apache.flink.util.concurrent.Executors;
7374

75+
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
7476
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
7577

7678
import io.fabric8.kubernetes.api.model.DeletionPropagation;
@@ -165,7 +167,9 @@ public TestingFlinkService(KubernetesClient kubernetesClient) {
165167
kubernetesClient,
166168
null,
167169
Executors.newDirectExecutorService(),
168-
FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
170+
FlinkOperatorConfiguration.fromConfiguration(new Configuration()),
171+
new NioEventLoopGroup(
172+
4, new ExecutorThreadFactory("flink-rest-client-netty-shared")));
169173
}
170174

171175
public <T extends HasMetadata> Context<T> getContext() {

0 commit comments

Comments
 (0)