Skip to content

Commit

Permalink
fix: skipping state update for finished workflow (#1735)
Browse files Browse the repository at this point in the history
* fix: skipping state update for finished workflow

* missing header
  • Loading branch information
aludwiko authored Jul 27, 2023
1 parent 8802c7d commit 5b91fa4
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ final class WorkflowImpl(system: ActorSystem, val services: Map[String, Workflow
init.userState match {
case Some(state) =>
val decoded = service.messageCodec.decodeMessage(state)
router._internalSetInitState(decoded)
router._internalSetInitState(decoded, finished = false) //TODO get this from the init message
case None => // no initial state
}

Expand All @@ -207,7 +207,7 @@ final class WorkflowImpl(system: ActorSystem, val services: Map[String, Workflow
val protoEffect =
persistence match {
case UpdateState(newState) =>
router._internalSetInitState(newState)
router._internalSetInitState(newState, transition.isInstanceOf[End.type])
WorkflowEffect.defaultInstance.withUserState(service.messageCodec.encodeScala(newState))
// TODO: persistence should be optional, but we must ensure that we don't save it back to null
// and preferably we should not even send it over the wire.
Expand Down Expand Up @@ -307,7 +307,7 @@ final class WorkflowImpl(system: ActorSystem, val services: Map[String, Workflow
val stepResponse =
try {
val decoded = service.messageCodec.decodeMessage(executeStep.userState.get)
router._internalSetInitState(decoded)
router._internalSetInitState(decoded, false) // here we know that workflow is still running
router._internalHandleStep(
executeStep.commandId,
executeStep.input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object WorkflowRouter {
abstract class WorkflowRouter[S, W <: Workflow[S]](protected val workflow: W) {

private var state: Option[S] = None
private var workflowFinished: Boolean = false;

private def stateOrEmpty(): S = state match {
case None =>
Expand All @@ -81,8 +82,11 @@ abstract class WorkflowRouter[S, W <: Workflow[S]](protected val workflow: W) {

/** INTERNAL API */
// "public" api against the impl/testkit
def _internalSetInitState(s: Any): Unit = {
state = Some(s.asInstanceOf[S])
def _internalSetInitState(s: Any, finished: Boolean): Unit = {
if (!workflowFinished) {
state = Some(s.asInstanceOf[S])
workflowFinished = finished
}
}

/** INTERNAL API */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.wiring.workflowentities;

import kalix.javasdk.annotations.Id;
import kalix.javasdk.annotations.TypeId;
import kalix.javasdk.workflow.Workflow;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;

@Id("id")
@TypeId("dummy-workflow")
@RequestMapping("/dummy-workflow/{id}")
public class DummyWorkflow extends Workflow<Integer> {

@Override
public WorkflowDef<Integer> definition() {
return workflow();
}

@PostMapping
public Effect<String> startAndFinish() {
return effects().updateState(10).end().thenReply("ok");
}

@PatchMapping("/test")
public Effect<String> update() {
return effects().updateState(20).transitionTo("asd").thenReply("ok");
}

@GetMapping
public Effect<Integer> get() {
return effects().reply(currentState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -402,15 +402,33 @@ public void failRequestWhenReqParamsIsNotPresent() {

//when
ResponseEntity<String> response = webClient.put().uri(path)
.retrieve()
.toEntity(String.class)
.onErrorResume(WebClientResponseException.class, error -> Mono.just(ResponseEntity.status(error.getStatusCode()).body(error.getResponseBodyAsString())))
.block(timeout);
.retrieve()
.toEntity(String.class)
.onErrorResume(WebClientResponseException.class, error -> Mono.just(ResponseEntity.status(error.getStatusCode()).body(error.getResponseBodyAsString())))
.block(timeout);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
assertThat(response.getBody()).isEqualTo("Required request parameter is missing: counterId");
}

@Test
public void shouldNotUpdateWorkflowStateAfterEndTransition() {
//given
var workflowId = randomId();
execute(componentClient.forWorkflow(workflowId).call(DummyWorkflow::startAndFinish));
assertThat(execute(componentClient.forWorkflow(workflowId).call(DummyWorkflow::get))).isEqualTo(10);

//when
try {
execute(componentClient.forWorkflow(workflowId).call(DummyWorkflow::update));
} catch (RuntimeException exception) {
// ignore "500 Internal Server Error" exception from the proxy
}

//then
assertThat(execute(componentClient.forWorkflow(workflowId).call(DummyWorkflow::get))).isEqualTo(10);
}

private <T> T execute(DeferredCall<Any, T> deferredCall) {
return execute(deferredCall, timeout);
}
Expand Down

0 comments on commit 5b91fa4

Please sign in to comment.