Skip to content

Commit

Permalink
fix: transforming workflow failover config (#2250)
Browse files Browse the repository at this point in the history
* fix: transforming workflow failover config

* removing docs

* vale config

* vale config

* fixing docs
  • Loading branch information
aludwiko authored Dec 16, 2024
1 parent 866561a commit ccbb581
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/src/modules/java-protobuf/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ NOTE: Lightbend provides Tier 1 support for the [.group-java]#Java# [.group-scal
Your development project needs to include the Kalix [.group-java]#Java# [.group-scala]#Scala# Protobuf SDK and logic to start the gRPC server. You define your components in gRPC descriptors and use `protoc` to compile them. Finally, you implement business logic for service components.
To save the work of starting from scratch, the Java xref:java-protobuf:project-template.adoc[code generation tool] creates a project from a template, complete with descriptors and implementations. Or, you can start from one of our fully implemented https://docs.kalix.io/samples/index.html[sample applications].
To save the work of starting from scratch, the Java xref:java-protobuf:project-template.adoc[code generation tool] creates a project from a template, complete with descriptors and implementations.
== Prerequisites
Expand Down
2 changes: 2 additions & 0 deletions docs/src/modules/java-protobuf/pages/workflows.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ include::example$java-protobuf-transfer-workflow-compensation/src/main/java/com/
<1> Sets a failover transition in case of a workflow timeout.
<2> Sets a default failover transition for all steps with maximum number of retries.
<3> Overrides the step recovery strategy for the `deposit` step.
<4> Failover steps should be added like any other steps.

Scala::
+
Expand All @@ -307,6 +308,7 @@ include::example$scala-protobuf-transfer-workflow-compensation/src/main/scala/co
<1> Sets a failover transition in case of a workflow timeout.
<2> Sets a default failover transition for all steps with maximum number of retries.
<3> Overrides the step recovery strategy for the `deposit` step.
<4> Failover steps should be added like any other steps.


NOTE: In case of a workflow timeout one last failover step can be performed. Transitions from that failover step will be ignored.
Expand Down
2 changes: 1 addition & 1 deletion docs/src/modules/java/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Docker:: Kalix requires https://docs.docker.com/get-docker/[Docker {tab-icon}, w
== Getting Started
You can start a new Kalix service using our xref:java:getting-started.adoc[Getting started] guide. If you prefer to first explore a fully implemented Kalix service, you can try one of our https://docs.kalix.io/samples/index.html[beginner samples].
You can start a new Kalix service using our xref:java:getting-started.adoc[Getting started] guide.
On the other hand, if you would rather spend some time exploring our documentation, here are some main features you will find in this section:
Expand Down
1 change: 1 addition & 0 deletions docs/src/modules/java/pages/workflows.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ include::example$java-spring-transfer-workflow-compensation/src/main/java/com/ex
<1> Sets a failover transition in case of a workflow timeout.
<2> Sets a default failover transition for all steps with maximum number of retries.
<3> Overrides the step recovery strategy for the `deposit` step.
<4> Failover steps should be added like any other steps.


NOTE: In case of a workflow timeout one last failover step can be performed. Transitions from that failover step will be ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ public WorkflowDef<TransferState> definition() {
.defaultStepRecoverStrategy(maxRetries(1).failoverTo("failover-handler")) // <2>
.addStep(withdraw)
.addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) // <3>
// end::recover-strategy[]
.addStep(compensateWithdraw)
.addStep(waitForAcceptation)
.addStep(compensateWithdraw) // <4>
.addStep(failoverHandler);
// end::recover-strategy[]
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ public WorkflowDef<TransferState> definition() {
.defaultStepRecoverStrategy(maxRetries(1).failoverTo("failover-handler")) // <2>
.addStep(withdraw)
.addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) // <3>
// end::recover-strategy[]
.addStep(compensateWithdraw)
.addStep(waitForAcceptation)
.addStep(compensateWithdraw) // <4>
.addStep(failoverHandler);
// end::recover-strategy[]
}

@PutMapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ class TransferWorkflow(context: WorkflowContext) extends AbstractTransferWorkflo
.defaultStepRecoverStrategy(maxRetries(1).failoverTo("failover-handler")) // <2>
.addStep(withdraw)
.addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) // <3>
// end::recover-strategy[]
.addStep(compensateWithdraw)
.addStep(waitForAcceptation)
.addStep(compensateWithdraw) // <4>
.addStep(failoverHandler);
// end::recover-strategy[]
}

override def start(currentState: TransferState, transfer: Transfer): Effect[Empty] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public Optional<Step> findByName(String name) {
*/
public WorkflowDef<S> addStep(Step step) {
addStepWithValidation(step);
step.timeout().ifPresent(timeout -> stepConfigs.add(new StepConfig(step.name(), Optional.of(timeout), Optional.empty())));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,16 @@ private[scalasdk] final class JavaWorkflowAdapter[S >: Null](scalaSdkWorkflow: A
case kalix.scalasdk.impl.workflow.WorkflowEffectImpl.TransitionalEffectImpl(javaEffect) => javaEffect
}
})
javaWorkflowDef.addStep(javaCallStep)
scalaDefinition.stepConfigs
.find(_.stepName == callStep.name)
.flatMap { stepConfig =>
stepConfig.timeout.map(_.toJava).foreach(javaCallStep.timeout)
stepConfig.recoverStrategy
} match {
case Some(recoverStrategy) => javaWorkflowDef.addStep(javaCallStep, convertToJava(recoverStrategy))
case None => javaWorkflowDef.addStep(javaCallStep)
}

case asyncCallStep: AsyncCallStep[Any @unchecked, Any @unchecked, Any @unchecked] =>
val javaAsyncCallStep = new javasdk.workflow.AbstractWorkflow.AsyncCallStep(
asyncCallStep.name,
Expand All @@ -95,7 +104,16 @@ private[scalasdk] final class JavaWorkflowAdapter[S >: Null](scalaSdkWorkflow: A
case kalix.scalasdk.impl.workflow.WorkflowEffectImpl.TransitionalEffectImpl(javaEffect) => javaEffect
}
})
javaWorkflowDef.addStep(javaAsyncCallStep)

scalaDefinition.stepConfigs
.find(_.stepName == asyncCallStep.name)
.flatMap { stepConfig =>
stepConfig.timeout.map(_.toJava).foreach(javaAsyncCallStep.timeout)
stepConfig.recoverStrategy
} match {
case Some(recoverStrategy) => javaWorkflowDef.addStep(javaAsyncCallStep, convertToJava(recoverStrategy))
case None => javaWorkflowDef.addStep(javaAsyncCallStep)
}
}
scalaDefinition.workflowTimeout.map(_.toJava).foreach(javaWorkflowDef.timeout)
scalaDefinition.stepTimeout.map(_.toJava).foreach(javaWorkflowDef.defaultStepTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ object AbstractWorkflow {
*/
def addStep(step: AbstractWorkflow.Step): AbstractWorkflow.WorkflowDef[S] = {
addStepWithValidation(step)
step.timeout.foreach(timeout =>
_stepConfigs.addOne(AbstractWorkflow.StepConfig(step.name, Option(timeout), None)))
this
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package kalix.scalasdk.impl.workflow

import java.time.Duration
import java.util.concurrent.CompletableFuture

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters.CollectionHasAsScala

import com.google.protobuf.empty.Empty
import kalix.javasdk.impl.GrpcDeferredCall
import kalix.javasdk.impl.MetadataImpl
import kalix.scalasdk.impl.ScalaDeferredCallAdapter
import kalix.scalasdk.workflow.AbstractWorkflow
import kalix.scalasdk.workflow.AbstractWorkflow.maxRetries
import kalix.scalasdk.workflow.ProtoWorkflow
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class JavaWorkflowAdapterSpec extends AnyWordSpec with Matchers {

"JavaWorkflowAdapter" should {
"convert scala workflow definition to java" in {
val adapted = new JavaWorkflowAdapter(new DummyWorkflow)

val definition = adapted.definition()
val steps = definition.getSteps.asScala
steps should have size 2
val step1 = steps.find(_.name() == "step1").get
val step2 = steps.find(_.name() == "step2").get
step1.timeout() shouldBe empty
step2.timeout().get() shouldBe Duration.ofSeconds(10)

val stepConfigs = definition.getStepConfigs.asScala
val step1Config = stepConfigs.find(_.stepName == "step1").get
val step2Config = stepConfigs.find(_.stepName == "step2").get

step1Config.recoverStrategy.get().maxRetries shouldBe 2
step1Config.recoverStrategy.get().failoverStepName shouldBe "step2"
step1Config.timeout.isPresent shouldBe false

step2Config.recoverStrategy.isPresent shouldBe false
step2Config.timeout.get() shouldBe Duration.ofSeconds(10)

definition.getStepTimeout().get() shouldBe Duration.ofSeconds(3)
definition.getStepRecoverStrategy.get().maxRetries shouldBe 3
definition.getStepRecoverStrategy.get().failoverStepName shouldBe "step1"
definition.getFailoverStepName.get() shouldBe "step3"
definition.getFailoverMaxRetries.get().maxRetries shouldBe 10
definition.getWorkflowTimeout.get() shouldBe Duration.ofSeconds(7)
}
}
}

class DummyWorkflow extends ProtoWorkflow[Empty] {
override def emptyState: Empty = Empty()

override def definition: AbstractWorkflow.WorkflowDef[Empty] = {
val step1 = step("step1")
.call { _: Empty =>
ScalaDeferredCallAdapter(
GrpcDeferredCall(
Empty(),
MetadataImpl.Empty,
"service1",
"method1",
_ => CompletableFuture.completedFuture(Empty())))
}
.andThen(_ => effects.end)

val step2 = step("step2")
.asyncCall { _: Empty =>
Future.successful(Empty())
}
.andThen(_ => effects.end)
.timeout(10.seconds)

workflow
.timeout(7.seconds)
.defaultStepTimeout(3.seconds)
.defaultStepRecoverStrategy(maxRetries(3).failoverTo("step1"))
.failoverTo("step3", maxRetries(10))
.addStep(step1, maxRetries(2).failoverTo("step2"))
.addStep(step2)
}
}
2 changes: 1 addition & 1 deletion styles/config/vocabularies/Base/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dev
facto
enum
env
failover
[Ff]ailover
[Gg]itHub
googleCloud
grpcui
Expand Down

0 comments on commit ccbb581

Please sign in to comment.