From 4862d71c350f6aa6044d8504747e5e521e6c9240 Mon Sep 17 00:00:00 2001 From: Piotr Przybylski Date: Wed, 22 Jan 2025 21:21:25 +0100 Subject: [PATCH] Ignore jobs in CANCELLING status when checking for duplicate jobs (#7481) --- .../inconsistency/InconsistentStateDetector.scala | 6 +++--- .../engine/api/deployment/simple/SimpleStateStatus.scala | 6 ++++-- docs/Changelog.md | 1 + .../nussknacker/engine/management/FlinkRestManager.scala | 4 ++-- .../engine/management/FlinkRestManagerSpec.scala | 3 ++- 5 files changed, 12 insertions(+), 8 deletions(-) diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala index 087c8d9f32d..c2f713a020a 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/inconsistency/InconsistentStateDetector.scala @@ -41,7 +41,7 @@ class InconsistentStateDetector extends LazyLogging { private def doExtractAtMostOneStatus( statusDetails: List[StatusDetails] ): Either[StatusDetails, Option[StatusDetails]] = { - val notFinalStatuses = statusDetails.filterNot(isFinalStatus) + val notFinalStatuses = statusDetails.filterNot(isFinalOrTransitioningToFinalStatus) (statusDetails, notFinalStatuses) match { case (Nil, Nil) => Right(None) case (_, singleNotFinished :: Nil) => Right(Some(singleNotFinished)) @@ -144,8 +144,8 @@ class InconsistentStateDetector extends LazyLogging { SimpleStateStatus.DefaultFollowingDeployStatuses.contains(state.status) } - protected def isFinalStatus(state: StatusDetails): Boolean = - SimpleStateStatus.isFinalStatus(state.status) + protected def isFinalOrTransitioningToFinalStatus(state: StatusDetails): Boolean = + SimpleStateStatus.isFinalOrTransitioningToFinalStatus(state.status) protected def isFinishedStatus(state: StatusDetails): Boolean = { state.status == SimpleStateStatus.Finished diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala index f18131e4c8a..2e2c59f67dd 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/simple/SimpleStateStatus.scala @@ -85,8 +85,10 @@ object SimpleStateStatus { val DefaultFollowingDeployStatuses: Set[StateStatus] = Set(DuringDeploy, Running) - def isFinalStatus(status: StateStatus): Boolean = - List(SimpleStateStatus.Finished, SimpleStateStatus.Canceled).contains(status) || ProblemStateStatus.isProblemStatus( + def isFinalOrTransitioningToFinalStatus(status: StateStatus): Boolean = + List(SimpleStateStatus.Finished, SimpleStateStatus.DuringCancel, SimpleStateStatus.Canceled).contains( + status + ) || ProblemStateStatus.isProblemStatus( status ) diff --git a/docs/Changelog.md b/docs/Changelog.md index d78ce2d903e..be27b7cb6af 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -61,6 +61,7 @@ * [#7443](https://github.com/TouK/nussknacker/pull/7443) Indexing on record is more similar to indexing on map. The change lets us access record values dynamically. For example now spel expression "{a: 5, b: 10}[#input.field]" compiles and has type "Integer" inferred from types of values of the record. This lets us access record value based on user input, for instance if user passes "{"field": "b"}" to scenario we will get value "10", whereas input {"field": "c"} would result in "null". Expression "{a: 5}["b"]" still does not compile because it is known at compile time that record does not have property "b". * [#7324](https://github.com/TouK/nussknacker/pull/7324) Fix: Passing Flink Job Global Params * [#7335](https://github.com/TouK/nussknacker/pull/7335) introduced `managersDirs` config to configure deployment managers directory paths (you can use `MANAGERS_DIR` env in case of docker-based deployments). The default is `./managers`. +* [#7481](https://github.com/TouK/nussknacker/pull/7481) Ignore jobs in CANCELLING status when checking for duplicate jobs on Flink ## 1.18 diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala index 3f22e5a8d17..8673bdc43ce 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala @@ -244,10 +244,10 @@ class FlinkRestManager( deploymentId: Option[DeploymentId], statuses: List[StatusDetails] ) = { - statuses.filterNot(details => SimpleStateStatus.isFinalStatus(details.status)) match { + statuses.filterNot(details => SimpleStateStatus.isFinalOrTransitioningToFinalStatus(details.status)) match { case Nil => logger.warn( - s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not present or finished on Flink." + s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not active on Flink." ) Future.successful(()) case single :: Nil => cancelFlinkJob(single) diff --git a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala index 200458a48ce..83ed7e8c791 100644 --- a/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala +++ b/engine/flink/management/src/test/scala/pl/touk/nussknacker/engine/management/FlinkRestManagerSpec.scala @@ -408,7 +408,8 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu test("return running status if cancelled job has last-modification date later then running job") { statuses = List( JobOverview("2343", "p1", 20L, 10L, JobStatus.RUNNING.name(), tasksOverview(running = 1)), - JobOverview("1111", "p1", 30L, 5L, JobStatus.CANCELED.name(), tasksOverview(canceled = 1)) + JobOverview("1111", "p1", 30L, 5L, JobStatus.CANCELED.name(), tasksOverview(canceled = 1)), + JobOverview("2222", "p1", 30L, 5L, JobStatus.CANCELLING.name(), tasksOverview(canceling = 1)) ) val manager = createManager(statuses)