Skip to content

Commit

Permalink
Ignore jobs in CANCELLING status when checking for duplicate jobs (#7481
Browse files Browse the repository at this point in the history
)
  • Loading branch information
piotrp authored Jan 22, 2025
1 parent e4ea480 commit 4862d71
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class InconsistentStateDetector extends LazyLogging {
private def doExtractAtMostOneStatus(
statusDetails: List[StatusDetails]
): Either[StatusDetails, Option[StatusDetails]] = {
val notFinalStatuses = statusDetails.filterNot(isFinalStatus)
val notFinalStatuses = statusDetails.filterNot(isFinalOrTransitioningToFinalStatus)
(statusDetails, notFinalStatuses) match {
case (Nil, Nil) => Right(None)
case (_, singleNotFinished :: Nil) => Right(Some(singleNotFinished))
Expand Down Expand Up @@ -144,8 +144,8 @@ class InconsistentStateDetector extends LazyLogging {
SimpleStateStatus.DefaultFollowingDeployStatuses.contains(state.status)
}

protected def isFinalStatus(state: StatusDetails): Boolean =
SimpleStateStatus.isFinalStatus(state.status)
protected def isFinalOrTransitioningToFinalStatus(state: StatusDetails): Boolean =
SimpleStateStatus.isFinalOrTransitioningToFinalStatus(state.status)

protected def isFinishedStatus(state: StatusDetails): Boolean = {
state.status == SimpleStateStatus.Finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ object SimpleStateStatus {

val DefaultFollowingDeployStatuses: Set[StateStatus] = Set(DuringDeploy, Running)

def isFinalStatus(status: StateStatus): Boolean =
List(SimpleStateStatus.Finished, SimpleStateStatus.Canceled).contains(status) || ProblemStateStatus.isProblemStatus(
def isFinalOrTransitioningToFinalStatus(status: StateStatus): Boolean =
List(SimpleStateStatus.Finished, SimpleStateStatus.DuringCancel, SimpleStateStatus.Canceled).contains(
status
) || ProblemStateStatus.isProblemStatus(
status
)

Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
* [#7443](https://github.com/TouK/nussknacker/pull/7443) Indexing on record is more similar to indexing on map. The change lets us access record values dynamically. For example now spel expression "{a: 5, b: 10}[#input.field]" compiles and has type "Integer" inferred from types of values of the record. This lets us access record value based on user input, for instance if user passes "{"field": "b"}" to scenario we will get value "10", whereas input {"field": "c"} would result in "null". Expression "{a: 5}["b"]" still does not compile because it is known at compile time that record does not have property "b".
* [#7324](https://github.com/TouK/nussknacker/pull/7324) Fix: Passing Flink Job Global Params
* [#7335](https://github.com/TouK/nussknacker/pull/7335) introduced `managersDirs` config to configure deployment managers directory paths (you can use `MANAGERS_DIR` env in case of docker-based deployments). The default is `./managers`.
* [#7481](https://github.com/TouK/nussknacker/pull/7481) Ignore jobs in CANCELLING status when checking for duplicate jobs on Flink

## 1.18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ class FlinkRestManager(
deploymentId: Option[DeploymentId],
statuses: List[StatusDetails]
) = {
statuses.filterNot(details => SimpleStateStatus.isFinalStatus(details.status)) match {
statuses.filterNot(details => SimpleStateStatus.isFinalOrTransitioningToFinalStatus(details.status)) match {
case Nil =>
logger.warn(
s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not present or finished on Flink."
s"Trying to cancel $processName${deploymentId.map(" with id: " + _).getOrElse("")} which is not active on Flink."
)
Future.successful(())
case single :: Nil => cancelFlinkJob(single)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ class FlinkRestManagerSpec extends AnyFunSuite with Matchers with PatientScalaFu
test("return running status if cancelled job has last-modification date later then running job") {
statuses = List(
JobOverview("2343", "p1", 20L, 10L, JobStatus.RUNNING.name(), tasksOverview(running = 1)),
JobOverview("1111", "p1", 30L, 5L, JobStatus.CANCELED.name(), tasksOverview(canceled = 1))
JobOverview("1111", "p1", 30L, 5L, JobStatus.CANCELED.name(), tasksOverview(canceled = 1)),
JobOverview("2222", "p1", 30L, 5L, JobStatus.CANCELLING.name(), tasksOverview(canceling = 1))
)

val manager = createManager(statuses)
Expand Down

0 comments on commit 4862d71

Please sign in to comment.