Skip to content

Commit 8bf41eb

Browse files
authored
[Feat] flink 1.15 support. (#1007)
* [Feat] flink 1.15 support.
1 parent fb5e67c commit 8bf41eb

File tree

7 files changed

+205
-18
lines changed

7 files changed

+205
-18
lines changed

streamx-flink/streamx-flink-shims/streamx-flink-shims-base/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@
3434
<scope>provided</scope>
3535
</dependency>
3636

37+
<dependency>
38+
<groupId>org.apache.flink</groupId>
39+
<artifactId>flink-clients_${scala.binary.version}</artifactId>
40+
<version>${flink.version}</version>
41+
<scope>provided</scope>
42+
</dependency>
43+
3744
<dependency>
3845
<groupId>org.apache.flink</groupId>
3946
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2019 The StreamX Project
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* https://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package com.streamxhub.streamx.flink.core
21+
22+
import org.apache.flink.api.common.{JobID, JobStatus}
23+
import org.apache.flink.client.program.ClusterClient
24+
import org.apache.flink.configuration.Configuration
25+
import org.apache.flink.runtime.client.JobStatusMessage
26+
import org.apache.flink.runtime.jobgraph.{JobGraph, OperatorID}
27+
import org.apache.flink.runtime.jobmaster.JobResult
28+
import org.apache.flink.runtime.messages.Acknowledge
29+
import org.apache.flink.runtime.operators.coordination.{CoordinationRequest, CoordinationResponse}
30+
31+
import java.util
32+
import java.util.concurrent.CompletableFuture
33+
34+
abstract class ClusterClientTrait[T](clusterClient: ClusterClient[T]) extends ClusterClient[T] {
35+
36+
override def close(): Unit = clusterClient.close()
37+
38+
override def getClusterId = clusterClient.getClusterId
39+
40+
override def getFlinkConfiguration: Configuration = clusterClient.getFlinkConfiguration
41+
42+
override def shutDownCluster(): Unit = clusterClient.shutDownCluster()
43+
44+
override def getWebInterfaceURL: String = clusterClient.getWebInterfaceURL
45+
46+
override def listJobs(): CompletableFuture[util.Collection[JobStatusMessage]] = clusterClient.listJobs()
47+
48+
override def disposeSavepoint(s: String): CompletableFuture[Acknowledge] = clusterClient.disposeSavepoint(s)
49+
50+
override def submitJob(jobGraph: JobGraph): CompletableFuture[JobID] = clusterClient.submitJob(jobGraph)
51+
52+
override def getJobStatus(jobID: JobID): CompletableFuture[JobStatus] = clusterClient.getJobStatus(jobID)
53+
54+
override def requestJobResult(jobID: JobID): CompletableFuture[JobResult] = clusterClient.requestJobResult(jobID)
55+
56+
override def getAccumulators(jobID: JobID, classLoader: ClassLoader): CompletableFuture[util.Map[String, AnyRef]] = clusterClient.getAccumulators(jobID, classLoader)
57+
58+
override def cancel(jobID: JobID): CompletableFuture[Acknowledge] = clusterClient.cancel(jobID)
59+
60+
override def sendCoordinationRequest(jobID: JobID, operatorID: OperatorID, coordinationRequest: CoordinationRequest): CompletableFuture[CoordinationResponse] = clusterClient.sendCoordinationRequest(jobID, operatorID, coordinationRequest)
61+
}
62+

streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.15/src/main/scala/com/streamxhub/streamx/flink/core/package.scala renamed to streamx-flink/streamx-flink-shims/streamx-flink-shims_flink-1.12/src/main/scala/com/streamxhub/streamx/flink/core/ClusterClient.scala

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,18 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
20-
package com.streamxhub.streamx.flink
21-
19+
package com.streamxhub.streamx.flink.core
2220
import org.apache.flink.api.common.JobID
23-
import org.apache.flink.client.program.ClusterClient
24-
import org.apache.flink.core.execution.SavepointFormatType
21+
import org.apache.flink.client.program.{ClusterClient => FlinkClusterClient}
2522

2623
import java.util.concurrent.CompletableFuture
2724

28-
package object core {
29-
implicit class EnhanceClusterClient(client: ClusterClient[_]) {
30-
def cancelWithSavepoint(jobID: JobID, savePointDir: String): CompletableFuture[String] = {
31-
client.cancelWithSavepoint(jobID, savePointDir, SavepointFormatType.DEFAULT)
32-
}
25+
class ClusterClient[T](clusterClient: FlinkClusterClient[T]) extends ClusterClientTrait[T](clusterClient) {
26+
27+
override def cancelWithSavepoint(jobID: JobID, s: String): CompletableFuture[String] = clusterClient.cancelWithSavepoint(jobID, s)
28+
29+
override def stopWithSavepoint(jobID: JobID, b: Boolean, s: String): CompletableFuture[String] = clusterClient.stopWithSavepoint(jobID, b, s)
30+
31+
override def triggerSavepoint(jobID: JobID, s: String): CompletableFuture[String] = clusterClient.triggerSavepoint(jobID, s)
3332

34-
def stopWithSavepoint(jobID: JobID, advanceToEndOfEventTime: Boolean, savePointDir: String): CompletableFuture[String] = {
35-
client.stopWithSavepoint(jobID, advanceToEndOfEventTime, savePointDir, SavepointFormatType.DEFAULT)
36-
}
37-
}
3833
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2019 The StreamX Project
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* https://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package com.streamxhub.streamx.flink.core
20+
import org.apache.flink.api.common.JobID
21+
import org.apache.flink.client.program.{ClusterClient => FlinkClusterClient}
22+
23+
import java.util.concurrent.CompletableFuture
24+
25+
class ClusterClient[T](clusterClient: FlinkClusterClient[T]) extends ClusterClientTrait[T](clusterClient) {
26+
27+
override def cancelWithSavepoint(jobID: JobID, s: String): CompletableFuture[String] = clusterClient.cancelWithSavepoint(jobID, s)
28+
29+
override def stopWithSavepoint(jobID: JobID, b: Boolean, s: String): CompletableFuture[String] = clusterClient.stopWithSavepoint(jobID, b, s)
30+
31+
override def triggerSavepoint(jobID: JobID, s: String): CompletableFuture[String] = clusterClient.triggerSavepoint(jobID, s)
32+
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2019 The StreamX Project
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* https://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package com.streamxhub.streamx.flink.core
20+
21+
import org.apache.flink.api.common.JobID
22+
import org.apache.flink.client.program.{ClusterClient => FlinkClusterClient}
23+
24+
import java.util.concurrent.CompletableFuture
25+
26+
class ClusterClient[T](clusterClient: FlinkClusterClient[T]) extends ClusterClientTrait[T](clusterClient) {
27+
28+
override def cancelWithSavepoint(jobID: JobID, s: String): CompletableFuture[String] = clusterClient.cancelWithSavepoint(jobID, s)
29+
30+
override def stopWithSavepoint(jobID: JobID, b: Boolean, s: String): CompletableFuture[String] = clusterClient.stopWithSavepoint(jobID, b, s)
31+
32+
override def triggerSavepoint(jobID: JobID, s: String): CompletableFuture[String] = clusterClient.triggerSavepoint(jobID, s)
33+
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (c) 2019 The StreamX Project
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* https://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package com.streamxhub.streamx.flink.core
20+
21+
import org.apache.flink.api.common.JobID
22+
import org.apache.flink.client.program.{ClusterClient => FlinkClusterClient}
23+
import org.apache.flink.core.execution.SavepointFormatType
24+
25+
import java.util.concurrent.CompletableFuture
26+
import javax.annotation.Nullable
27+
28+
class ClusterClient[T](clusterClient: FlinkClusterClient[T]) extends ClusterClientTrait[T](clusterClient) {
29+
30+
override def cancelWithSavepoint(jobId: JobID, @Nullable savepointDirectory: String, formatType: SavepointFormatType): CompletableFuture[String] = {
31+
clusterClient.cancelWithSavepoint(jobId, savepointDirectory, formatType)
32+
}
33+
34+
override def stopWithSavepoint(jobId: JobID,
35+
advanceToEndOfEventTime: Boolean,
36+
@Nullable savepointDir: String,
37+
formatType: SavepointFormatType): CompletableFuture[String] = {
38+
clusterClient.stopWithSavepoint(jobId, advanceToEndOfEventTime, savepointDir, formatType)
39+
}
40+
41+
override def triggerSavepoint(jobId: JobID, @Nullable savepointDir: String, formatType: SavepointFormatType): CompletableFuture[String] = {
42+
clusterClient.triggerSavepoint(jobId, savepointDir, formatType)
43+
}
44+
45+
def cancelWithSavepoint(jobID: JobID, savepointDirectory: String): CompletableFuture[String] = {
46+
clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT)
47+
}
48+
49+
def stopWithSavepoint(jobID: JobID, advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String] = {
50+
clusterClient.stopWithSavepoint(jobID, advanceToEndOfEventTime, savepointDirectory, SavepointFormatType.DEFAULT)
51+
}
52+
53+
}

streamx-plugin/streamx-flink-submit/streamx-flink-submit-core/src/main/scala/com/streamxhub/streamx/flink/submit/trait/FlinkSubmitTrait.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
package com.streamxhub.streamx.flink.submit.`trait`
2121

2222
import com.google.common.collect.Lists
23-
import com.streamxhub.streamx.flink.core._
2423
import com.streamxhub.streamx.common.conf.ConfigConst._
2524
import com.streamxhub.streamx.common.conf.Workspace
2625
import com.streamxhub.streamx.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode, ResolveOrder}
2726
import com.streamxhub.streamx.common.util.{Logger, SystemPropertyUtils, Utils}
2827
import com.streamxhub.streamx.flink.core.conf.FlinkRunOption
28+
import com.streamxhub.streamx.flink.core.{ClusterClient => ClusterClientWrapper}
2929
import com.streamxhub.streamx.flink.submit.bean._
3030
import org.apache.commons.cli.{CommandLine, Options}
3131
import org.apache.commons.collections.MapUtils
@@ -579,12 +579,15 @@ trait FlinkSubmitTrait extends Logger {
579579
}
580580

581581
val clientTimeout = getOptionFromDefaultFlinkConfig(stopRequest.flinkVersion.flinkHome, ClientOptions.CLIENT_TIMEOUT)
582+
583+
val clientWrapper = new ClusterClientWrapper(client)
584+
582585
(Try(stopRequest.withSavePoint).getOrElse(false), Try(stopRequest.withDrain).getOrElse(false)) match {
583586
case (false, false) =>
584-
client.cancel(jobID).get()
587+
clientWrapper.cancel(jobID).get()
585588
null
586-
case (true, false) => client.cancelWithSavepoint(jobID, savePointDir).get(clientTimeout.toMillis, TimeUnit.MILLISECONDS)
587-
case (_, _) => client.stopWithSavepoint(jobID, stopRequest.withDrain, savePointDir).get(clientTimeout.toMillis, TimeUnit.MILLISECONDS)
589+
case (true, false) => clientWrapper.cancelWithSavepoint(jobID, savePointDir).get(clientTimeout.toMillis, TimeUnit.MILLISECONDS)
590+
case (_, _) => clientWrapper.stopWithSavepoint(jobID, stopRequest.withDrain, savePointDir).get(clientTimeout.toMillis, TimeUnit.MILLISECONDS)
588591
}
589592
}
590593

0 commit comments

Comments
 (0)