Skip to content

Commit 16bfa38

Browse files
Li-GLliguoliang1
andauthored
[Bug]Fix FlinkJobStatusWatcher deadlock & NullPointerException (#4327)
* Fix FlinkJobStatusWatcher deadlock * Fix the issue where completed task status is not being updated --------- Co-authored-by: liguoliang1 <liguoliang1@hisense.com>
1 parent 8dd9217 commit 16bfa38

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
314314
)
315315
val jobState = trackId match {
316316
case id
317-
if watchController.canceling.has(id) || latest.jobState.equals(
318-
FlinkJobState.CANCELLING) =>
317+
if watchController.canceling.has(id) || Option(latest).exists(
318+
_.jobState == FlinkJobState.CANCELLING) =>
319319
logger.info(s"trackId ${trackId.toString} is canceling")
320320
if (deployExists) FlinkJobState.CANCELLING else FlinkJobState.CANCELED
321321
case _ =>

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ import scala.language.implicitConversions
2828
trait FlinkWatcher extends AutoCloseable {
2929

3030
// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
31-
lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
31+
val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
3232
Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
3333

3434
// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
35-
lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(30000L)
35+
val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(30000L)
3636

3737
private[this] val started: AtomicBoolean = new AtomicBoolean(false)
3838

0 commit comments

Comments
 (0)