Skip to content

Commit a931e27

Browse files
author
chenyuzhi
committed
[FLINK-36876] Add proxy for restClient
1 parent a3f14ef commit a931e27

File tree

10 files changed

+310
-11
lines changed

10 files changed

+310
-11
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class FlinkOperatorConfiguration {
5353
Duration progressCheckInterval;
5454
Duration restApiReadyDelay;
5555
Duration flinkClientTimeout;
56+
int flinkClientIOThreads;
5657
String flinkServiceHostOverride;
5758
Set<String> watchedNamespaces;
5859
boolean dynamicNamespacesEnabled;
@@ -99,6 +100,10 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
99100
Duration flinkClientTimeout =
100101
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_TIMEOUT);
101102

103+
int flinkClientIOThreads =
104+
operatorConfig.getInteger(
105+
KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_IO_THREADS);
106+
102107
Duration flinkCancelJobTimeout =
103108
operatorConfig.get(
104109
KubernetesOperatorConfigOptions.OPERATOR_FLINK_CLIENT_CANCEL_TIMEOUT);
@@ -209,6 +214,7 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
209214
progressCheckInterval,
210215
restApiReadyDelay,
211216
flinkClientTimeout,
217+
flinkClientIOThreads,
212218
flinkServiceHostOverride,
213219
watchedNamespaces,
214220
dynamicNamespacesEnabled,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ public static String operatorConfigKey(String key) {
115115
.withDescription(
116116
"The timeout for the observer to wait the flink rest client to return.");
117117

118+
@Documentation.Section(SECTION_SYSTEM)
119+
public static final ConfigOption<Integer> OPERATOR_FLINK_CLIENT_IO_THREADS =
120+
operatorConfig("kubernetes.operator.flink.client.io.threads")
121+
.intType()
122+
.defaultValue(60)
123+
.withDescription(
124+
"The maximum number of io threads used by the flink rest client.");
125+
118126
@Documentation.Section(SECTION_SYSTEM)
119127
public static final ConfigOption<Duration> OPERATOR_FLINK_CLIENT_CANCEL_TIMEOUT =
120128
operatorConfig("flink.client.cancel.timeout")

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2525
import org.apache.flink.autoscaler.utils.JobStatusUtils;
2626
import org.apache.flink.client.program.rest.RestClusterClient;
27+
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
28+
import org.apache.flink.client.program.rest.retry.WaitStrategy;
2729
import org.apache.flink.configuration.CheckpointingOptions;
2830
import org.apache.flink.configuration.Configuration;
2931
import org.apache.flink.configuration.RestOptions;
@@ -56,6 +58,7 @@
5658
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
5759
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
5860
import org.apache.flink.runtime.client.JobStatusMessage;
61+
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServicesFactory;
5962
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
6063
import org.apache.flink.runtime.jobmaster.JobResult;
6164
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
@@ -109,6 +112,7 @@
109112
import org.apache.flink.util.Preconditions;
110113

111114
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
115+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
112116
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
113117

114118
import io.fabric8.kubernetes.api.model.DeletionPropagation;
@@ -129,6 +133,7 @@
129133
import java.io.File;
130134
import java.io.FileOutputStream;
131135
import java.io.IOException;
136+
import java.lang.reflect.Constructor;
132137
import java.net.HttpURLConnection;
133138
import java.net.InetSocketAddress;
134139
import java.net.MalformedURLException;
@@ -175,17 +180,20 @@ public abstract class AbstractFlinkService implements FlinkService {
175180
protected final ExecutorService executorService;
176181
protected final FlinkOperatorConfiguration operatorConfig;
177182
protected final ArtifactManager artifactManager;
183+
private final EventLoopGroup flinkClientSharedEventLoopGroup;
178184
private static final String EMPTY_JAR = createEmptyJar();
179185

180186
public AbstractFlinkService(
181187
KubernetesClient kubernetesClient,
182188
ArtifactManager artifactManager,
183189
ExecutorService executorService,
184-
FlinkOperatorConfiguration operatorConfig) {
190+
FlinkOperatorConfiguration operatorConfig,
191+
EventLoopGroup flinkClientEventLoopGroup) {
185192
this.kubernetesClient = kubernetesClient;
186193
this.artifactManager = artifactManager;
187194
this.executorService = executorService;
188195
this.operatorConfig = operatorConfig;
196+
this.flinkClientSharedEventLoopGroup = flinkClientEventLoopGroup;
189197
}
190198

191199
protected abstract PodList getJmPodList(String namespace, String clusterId);
@@ -842,11 +850,34 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
842850
operatorConfig.getFlinkServiceHostOverride(),
843851
ExternalServiceDecorator.getNamespacedExternalServiceName(
844852
clusterId, namespace));
853+
845854
final String restServerAddress = String.format("http://%s:%s", host, port);
846-
return new RestClusterClient<>(
847-
operatorRestConf,
848-
clusterId,
849-
(c, e) -> new StandaloneClientHAServices(restServerAddress));
855+
RestClient restClient =
856+
new RestClientProxy(
857+
operatorRestConf,
858+
executorService,
859+
host,
860+
port,
861+
flinkClientSharedEventLoopGroup);
862+
863+
Constructor<?> clusterClientConstructor =
864+
RestClusterClient.class.getDeclaredConstructor(
865+
Configuration.class,
866+
RestClient.class,
867+
Object.class,
868+
WaitStrategy.class,
869+
ClientHighAvailabilityServicesFactory.class);
870+
871+
clusterClientConstructor.setAccessible(true);
872+
873+
return (RestClusterClient<String>)
874+
clusterClientConstructor.newInstance(
875+
operatorRestConf,
876+
restClient,
877+
clusterId,
878+
new ExponentialWaitStrategy(10L, 2000L),
879+
(ClientHighAvailabilityServicesFactory)
880+
(c, e) -> new StandaloneClientHAServices(restServerAddress));
850881
}
851882

852883
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
3737

3838
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
39+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
40+
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
3941

4042
import io.javaoperatorsdk.operator.api.reconciler.Context;
4143
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -74,6 +76,7 @@ public static final class ExceptionCacheEntry {
7476

7577
protected final Map<Tuple2<Class<?>, ResourceID>, KubernetesResourceMetricGroup>
7678
resourceMetricGroups = new ConcurrentHashMap<>();
79+
private final EventLoopGroup flinkClientEventLoopGroup;
7780

7881
public FlinkResourceContextFactory(
7982
FlinkConfigManager configManager,
@@ -87,6 +90,10 @@ public FlinkResourceContextFactory(
8790
Executors.newFixedThreadPool(
8891
configManager.getOperatorConfiguration().getReconcilerMaxParallelism(),
8992
new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
93+
this.flinkClientEventLoopGroup =
94+
new NioEventLoopGroup(
95+
configManager.getOperatorConfiguration().getFlinkClientIOThreads(),
96+
new ExecutorThreadFactory("flink-rest-client-netty-shared"));
9097
}
9198

9299
public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
@@ -154,13 +161,15 @@ protected FlinkService getFlinkService(FlinkResourceContext<?> ctx) {
154161
artifactManager,
155162
clientExecutorService,
156163
ctx.getOperatorConfig(),
157-
eventRecorder);
164+
eventRecorder,
165+
flinkClientEventLoopGroup);
158166
case STANDALONE:
159167
return new StandaloneFlinkService(
160168
ctx.getKubernetesClient(),
161169
artifactManager,
162170
clientExecutorService,
163-
ctx.getOperatorConfig());
171+
ctx.getOperatorConfig(),
172+
flinkClientEventLoopGroup);
164173
default:
165174
throw new UnsupportedOperationException(
166175
String.format("Unsupported deployment mode: %s", deploymentMode));

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
5252
import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
5353

54+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
55+
5456
import io.fabric8.kubernetes.api.model.DeletionPropagation;
5557
import io.fabric8.kubernetes.api.model.PodList;
5658
import io.fabric8.kubernetes.api.model.apps.Deployment;
@@ -89,7 +91,28 @@ public NativeFlinkService(
8991
ExecutorService executorService,
9092
FlinkOperatorConfiguration operatorConfig,
9193
EventRecorder eventRecorder) {
92-
super(kubernetesClient, artifactManager, executorService, operatorConfig);
94+
this(
95+
kubernetesClient,
96+
artifactManager,
97+
executorService,
98+
operatorConfig,
99+
eventRecorder,
100+
null);
101+
}
102+
103+
public NativeFlinkService(
104+
KubernetesClient kubernetesClient,
105+
ArtifactManager artifactManager,
106+
ExecutorService executorService,
107+
FlinkOperatorConfiguration operatorConfig,
108+
EventRecorder eventRecorder,
109+
EventLoopGroup flinkClientEventLoopGroup) {
110+
super(
111+
kubernetesClient,
112+
artifactManager,
113+
executorService,
114+
operatorConfig,
115+
flinkClientEventLoopGroup);
93116
this.eventRecorder = eventRecorder;
94117
}
95118

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.service;
19+
20+
import org.apache.flink.api.common.time.Time;
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.runtime.rest.RestClient;
23+
import org.apache.flink.util.ConfigurationException;
24+
import org.apache.flink.util.Preconditions;
25+
26+
import org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap;
27+
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
28+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
29+
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.lang.reflect.Field;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.ExecutorService;
36+
37+
/** Proxy for the {@link RestClient}. */
38+
public class RestClientProxy extends RestClient {
39+
private static final Logger LOG = LoggerFactory.getLogger(RestClientProxy.class);
40+
41+
private final boolean useSharedEventLoopGroup;
42+
private Field groupField;
43+
private Bootstrap bootstrap;
44+
private CompletableFuture<Void> terminationFuture;
45+
46+
public RestClientProxy(
47+
Configuration configuration,
48+
ExecutorService executor,
49+
String host,
50+
int port,
51+
EventLoopGroup sharedGroup)
52+
throws ConfigurationException, NoSuchFieldException, IllegalAccessException {
53+
super(configuration, executor, host, port);
54+
55+
if (sharedGroup != null) {
56+
Preconditions.checkArgument(
57+
!sharedGroup.isShuttingDown() && !sharedGroup.isShutdown(),
58+
"provided eventLoopGroup is shut/shutting down");
59+
60+
// get private field
61+
Field bootstrapField = RestClient.class.getDeclaredField("bootstrap");
62+
Field terminationFutureField = RestClient.class.getDeclaredField("terminationFuture");
63+
Field groupField = AbstractBootstrap.class.getDeclaredField("group");
64+
65+
bootstrapField.setAccessible(true);
66+
terminationFutureField.setAccessible(true);
67+
groupField.setAccessible(true);
68+
69+
// TODO check null
70+
this.terminationFuture = (CompletableFuture<Void>) terminationFutureField.get(this);
71+
this.bootstrap = (Bootstrap) bootstrapField.get(this);
72+
this.groupField = groupField;
73+
74+
// close previous group
75+
bootstrap.config().group().shutdown();
76+
// setup share group
77+
groupField.set(bootstrap, sharedGroup);
78+
79+
useSharedEventLoopGroup = true;
80+
} else {
81+
useSharedEventLoopGroup = false;
82+
}
83+
}
84+
85+
@Override
86+
public CompletableFuture<Void> closeAsync() {
87+
if (useSharedEventLoopGroup) {
88+
this.shutdownInternal();
89+
}
90+
91+
return super.closeAsync();
92+
}
93+
94+
@Override
95+
public void shutdown(Time timeout) {
96+
if (useSharedEventLoopGroup) {
97+
this.shutdownInternal();
98+
}
99+
super.shutdown(timeout);
100+
}
101+
102+
private void shutdownInternal() {
103+
try {
104+
// replace bootstrap's group to null to avoid shutdown shared group
105+
groupField.set(bootstrap, null);
106+
terminationFuture.complete(null);
107+
} catch (IllegalAccessException e) {
108+
LOG.error("Failed to setup rest client event group .", e);
109+
}
110+
}
111+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
4040
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
4141

42+
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
43+
4244
import io.fabric8.kubernetes.api.model.DeletionPropagation;
4345
import io.fabric8.kubernetes.api.model.PodList;
4446
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -61,7 +63,21 @@ public StandaloneFlinkService(
6163
ArtifactManager artifactManager,
6264
ExecutorService executorService,
6365
FlinkOperatorConfiguration operatorConfig) {
64-
super(kubernetesClient, artifactManager, executorService, operatorConfig);
66+
this(kubernetesClient, artifactManager, executorService, operatorConfig, null);
67+
}
68+
69+
public StandaloneFlinkService(
70+
KubernetesClient kubernetesClient,
71+
ArtifactManager artifactManager,
72+
ExecutorService executorService,
73+
FlinkOperatorConfiguration operatorConfig,
74+
EventLoopGroup flinkClientEventLoopGroup) {
75+
super(
76+
kubernetesClient,
77+
artifactManager,
78+
executorService,
79+
operatorConfig,
80+
flinkClientEventLoopGroup);
6581
}
6682

6783
@Override

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,10 @@
7171
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
7272
import org.apache.flink.runtime.rest.util.RestClientException;
7373
import org.apache.flink.util.SerializedThrowable;
74+
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
7475
import org.apache.flink.util.concurrent.Executors;
7576

77+
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
7678
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
7779

7880
import io.fabric8.kubernetes.api.model.DeletionPropagation;
@@ -171,7 +173,9 @@ public TestingFlinkService(KubernetesClient kubernetesClient) {
171173
kubernetesClient,
172174
null,
173175
Executors.newDirectExecutorService(),
174-
FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
176+
FlinkOperatorConfiguration.fromConfiguration(new Configuration()),
177+
new NioEventLoopGroup(
178+
4, new ExecutorThreadFactory("flink-rest-client-netty-shared")));
175179
}
176180

177181
public <T extends HasMetadata> Context<T> getContext() {

0 commit comments

Comments
 (0)