From 5b91fa41081c366a1606b4240a0208732478480c Mon Sep 17 00:00:00 2001 From: Andrzej Ludwikowski Date: Thu, 27 Jul 2023 17:51:35 +0300 Subject: [PATCH] fix: skipping state update for finished workflow (#1735) * fix: skipping state update for finished workflow * missing header --- .../javasdk/impl/workflow/WorkflowImpl.scala | 6 +-- .../impl/workflow/WorkflowRouter.scala | 8 ++- .../workflowentities/DummyWorkflow.java | 51 +++++++++++++++++++ .../SpringWorkflowIntegrationTest.java | 26 ++++++++-- 4 files changed, 82 insertions(+), 9 deletions(-) create mode 100644 sdk/java-sdk-spring/src/it/java/com/example/wiring/workflowentities/DummyWorkflow.java diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowImpl.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowImpl.scala index 4fe7210b16..840b570d44 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowImpl.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowImpl.scala @@ -196,7 +196,7 @@ final class WorkflowImpl(system: ActorSystem, val services: Map[String, Workflow init.userState match { case Some(state) => val decoded = service.messageCodec.decodeMessage(state) - router._internalSetInitState(decoded) + router._internalSetInitState(decoded, finished = false) //TODO get this from the init message case None => // no initial state } @@ -207,7 +207,7 @@ final class WorkflowImpl(system: ActorSystem, val services: Map[String, Workflow val protoEffect = persistence match { case UpdateState(newState) => - router._internalSetInitState(newState) + router._internalSetInitState(newState, transition.isInstanceOf[End.type]) WorkflowEffect.defaultInstance.withUserState(service.messageCodec.encodeScala(newState)) // TODO: persistence should be optional, but we must ensure that we don't save it back to null // and preferably we should not even send it over the wire. @@ -307,7 +307,7 @@ final class WorkflowImpl(system: ActorSystem, val services: Map[String, Workflow val stepResponse = try { val decoded = service.messageCodec.decodeMessage(executeStep.userState.get) - router._internalSetInitState(decoded) + router._internalSetInitState(decoded, false) // here we know that workflow is still running router._internalHandleStep( executeStep.commandId, executeStep.input, diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowRouter.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowRouter.scala index 9e72ef65da..eeb1d6582c 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowRouter.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowRouter.scala @@ -64,6 +64,7 @@ object WorkflowRouter { abstract class WorkflowRouter[S, W <: Workflow[S]](protected val workflow: W) { private var state: Option[S] = None + private var workflowFinished: Boolean = false; private def stateOrEmpty(): S = state match { case None => @@ -81,8 +82,11 @@ abstract class WorkflowRouter[S, W <: Workflow[S]](protected val workflow: W) { /** INTERNAL API */ // "public" api against the impl/testkit - def _internalSetInitState(s: Any): Unit = { - state = Some(s.asInstanceOf[S]) + def _internalSetInitState(s: Any, finished: Boolean): Unit = { + if (!workflowFinished) { + state = Some(s.asInstanceOf[S]) + workflowFinished = finished + } } /** INTERNAL API */ diff --git a/sdk/java-sdk-spring/src/it/java/com/example/wiring/workflowentities/DummyWorkflow.java b/sdk/java-sdk-spring/src/it/java/com/example/wiring/workflowentities/DummyWorkflow.java new file mode 100644 index 0000000000..36a1143bf4 --- /dev/null +++ b/sdk/java-sdk-spring/src/it/java/com/example/wiring/workflowentities/DummyWorkflow.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.wiring.workflowentities; + +import kalix.javasdk.annotations.Id; +import kalix.javasdk.annotations.TypeId; +import kalix.javasdk.workflow.Workflow; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PatchMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; + +@Id("id") +@TypeId("dummy-workflow") +@RequestMapping("/dummy-workflow/{id}") +public class DummyWorkflow extends Workflow { + + @Override + public WorkflowDef definition() { + return workflow(); + } + + @PostMapping + public Effect startAndFinish() { + return effects().updateState(10).end().thenReply("ok"); + } + + @PatchMapping("/test") + public Effect update() { + return effects().updateState(20).transitionTo("asd").thenReply("ok"); + } + + @GetMapping + public Effect get() { + return effects().reply(currentState()); + } +} diff --git a/sdk/java-sdk-spring/src/it/java/com/example/wiring/workflowentities/SpringWorkflowIntegrationTest.java b/sdk/java-sdk-spring/src/it/java/com/example/wiring/workflowentities/SpringWorkflowIntegrationTest.java index 0a2f43bdbf..1b64f9d5f3 100644 --- a/sdk/java-sdk-spring/src/it/java/com/example/wiring/workflowentities/SpringWorkflowIntegrationTest.java +++ b/sdk/java-sdk-spring/src/it/java/com/example/wiring/workflowentities/SpringWorkflowIntegrationTest.java @@ -402,15 +402,33 @@ public void failRequestWhenReqParamsIsNotPresent() { //when ResponseEntity response = webClient.put().uri(path) - .retrieve() - .toEntity(String.class) - .onErrorResume(WebClientResponseException.class, error -> Mono.just(ResponseEntity.status(error.getStatusCode()).body(error.getResponseBodyAsString()))) - .block(timeout); + .retrieve() + .toEntity(String.class) + .onErrorResume(WebClientResponseException.class, error -> Mono.just(ResponseEntity.status(error.getStatusCode()).body(error.getResponseBodyAsString()))) + .block(timeout); assertThat(response.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST); assertThat(response.getBody()).isEqualTo("Required request parameter is missing: counterId"); } + @Test + public void shouldNotUpdateWorkflowStateAfterEndTransition() { + //given + var workflowId = randomId(); + execute(componentClient.forWorkflow(workflowId).call(DummyWorkflow::startAndFinish)); + assertThat(execute(componentClient.forWorkflow(workflowId).call(DummyWorkflow::get))).isEqualTo(10); + + //when + try { + execute(componentClient.forWorkflow(workflowId).call(DummyWorkflow::update)); + } catch (RuntimeException exception) { + // ignore "500 Internal Server Error" exception from the proxy + } + + //then + assertThat(execute(componentClient.forWorkflow(workflowId).call(DummyWorkflow::get))).isEqualTo(10); + } + private T execute(DeferredCall deferredCall) { return execute(deferredCall, timeout); }