From b5b4b4c8ffb4003dd275ee58a72b67a1a54e5f55 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Wed, 25 Jan 2023 21:03:11 +1300 Subject: [PATCH] doc: Samples for replicated event sourcing over gRPC (#769) Additionally: * Api to allow replicated behaviors to compose with setup behaviors --- .github/workflows/checks.yml | 9 + .../ReplicationIntegrationSpec.scala | 45 +- .../ReplicationJavaDSLIntegrationSpec.scala | 4 +- .../javadsl/ReplicatedBehaviors.scala | 23 + .../replication/javadsl/Replication.scala | 24 +- .../scaladsl/ReplicatedBehaviors.scala | 22 + .../replication/scaladsl/Replication.scala | 13 +- .../javdsl/ReplicationCompileTest.java | 16 +- ...rpc-replicated-event-sourcing-transport.md | 59 ++- docs/src/main/paradox/grpc.md | 4 +- .../shopping-cart-service-java/.gitignore | 16 + .../shopping-cart-service-java/LICENSE | 10 + .../shopping-cart-service-java/README.md | 79 ++++ .../ddl-scripts/create_tables.sql | 94 +++++ .../docker-compose.yml | 18 + .../shopping-cart-service-java/pom.xml | 272 ++++++++++++ .../java/shopping/cart/CborSerializable.java | 7 + .../src/main/java/shopping/cart/Main.java | 50 +++ .../main/java/shopping/cart/ShoppingCart.java | 395 ++++++++++++++++++ .../shopping/cart/ShoppingCartServer.java | 59 +++ .../cart/ShoppingCartServiceImpl.java | 120 ++++++ .../main/protobuf/ShoppingCartService.proto | 45 ++ .../src/main/resources/application.conf | 12 + .../src/main/resources/cluster.conf | 27 ++ .../src/main/resources/grpc.conf | 11 + .../src/main/resources/local-shared.conf | 18 + .../src/main/resources/logback.xml | 19 + .../src/main/resources/persistence.conf | 40 ++ .../main/resources/replica1-local-shared.conf | 15 + .../src/main/resources/replica1-local1.conf | 7 + .../src/main/resources/replica1-local2.conf | 7 + .../src/main/resources/replica1-local3.conf | 7 + .../main/resources/replica2-local-shared.conf | 15 + .../src/main/resources/replica2-local1.conf | 7 + .../src/main/resources/replica2-local2.conf | 7 + .../src/main/resources/replica2-local3.conf | 7 + .../src/main/resources/replication.conf | 30 ++ .../src/main/resources/serialization.conf | 3 + .../shopping-cart-service-scala/.gitignore | 16 + .../.scalafmt.conf | 49 +++ .../shopping-cart-service-scala/LICENSE | 10 + .../shopping-cart-service-scala/README.md | 73 ++++ .../shopping-cart-service-scala/build.sbt | 75 ++++ .../ddl-scripts/create_tables.sql | 94 +++++ .../docker-compose.yml | 18 + .../project/build.properties | 1 + .../project/plugins.sbt | 4 + .../main/protobuf/ShoppingCartService.proto | 45 ++ .../src/main/resources/application.conf | 12 + .../src/main/resources/cluster.conf | 27 ++ .../src/main/resources/grpc.conf | 10 + .../src/main/resources/local-shared.conf | 18 + .../src/main/resources/logback.xml | 19 + .../src/main/resources/persistence.conf | 39 ++ .../main/resources/replica1-local-shared.conf | 15 + .../src/main/resources/replica1-local1.conf | 7 + .../src/main/resources/replica1-local2.conf | 7 + .../src/main/resources/replica1-local3.conf | 7 + .../main/resources/replica2-local-shared.conf | 15 + .../src/main/resources/replica2-local1.conf | 7 + .../src/main/resources/replica2-local2.conf | 7 + .../src/main/resources/replica2-local3.conf | 7 + .../src/main/resources/replication.conf | 30 ++ .../src/main/resources/serialization.conf | 3 + .../shopping/cart/CborSerializable.scala | 8 + .../src/main/scala/shopping/cart/Main.scala | 41 ++ .../scala/shopping/cart/ShoppingCart.scala | 264 ++++++++++++ .../shopping/cart/ShoppingCartServer.scala | 53 +++ .../cart/ShoppingCartServiceImpl.scala | 82 ++++ 69 files changed, 2620 insertions(+), 59 deletions(-) create mode 100644 akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicatedBehaviors.scala create mode 100644 akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicatedBehaviors.scala create mode 100644 samples/replicated/shopping-cart-service-java/.gitignore create mode 100644 samples/replicated/shopping-cart-service-java/LICENSE create mode 100644 samples/replicated/shopping-cart-service-java/README.md create mode 100644 samples/replicated/shopping-cart-service-java/ddl-scripts/create_tables.sql create mode 100644 samples/replicated/shopping-cart-service-java/docker-compose.yml create mode 100644 samples/replicated/shopping-cart-service-java/pom.xml create mode 100644 samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/CborSerializable.java create mode 100644 samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java create mode 100644 samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java create mode 100644 samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServer.java create mode 100644 samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java create mode 100644 samples/replicated/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/application.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/cluster.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/grpc.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/local-shared.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/logback.xml create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/persistence.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local-shared.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local1.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local2.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local3.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local-shared.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local1.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local2.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local3.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf create mode 100644 samples/replicated/shopping-cart-service-java/src/main/resources/serialization.conf create mode 100644 samples/replicated/shopping-cart-service-scala/.gitignore create mode 100644 samples/replicated/shopping-cart-service-scala/.scalafmt.conf create mode 100644 samples/replicated/shopping-cart-service-scala/LICENSE create mode 100644 samples/replicated/shopping-cart-service-scala/README.md create mode 100644 samples/replicated/shopping-cart-service-scala/build.sbt create mode 100644 samples/replicated/shopping-cart-service-scala/ddl-scripts/create_tables.sql create mode 100644 samples/replicated/shopping-cart-service-scala/docker-compose.yml create mode 100644 samples/replicated/shopping-cart-service-scala/project/build.properties create mode 100644 samples/replicated/shopping-cart-service-scala/project/plugins.sbt create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/application.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/cluster.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/grpc.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/local-shared.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/logback.xml create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/persistence.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local-shared.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local1.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local2.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local3.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local-shared.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local1.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local2.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local3.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/resources/serialization.conf create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/CborSerializable.scala create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServer.scala create mode 100644 samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 0fcf74fb6..021f1fa1d 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -139,3 +139,12 @@ jobs: cd samples/grpc/shopping-analytics-service-java mvn compile -nsu -Dakka-projection.version=`cat ~/.version` + - name: Compile Scala Replicated Event Sourcing over gRPC sample Shopping Cart + run: |- + cd samples/replicated/shopping-cart-service-scala + sbt compile -Dakka-projection.version=`cat ~/.version` + + - name: Compile Java Replicated Event Sourcing over gRPC sample Shopping Cart + run: |- + cd samples/replicated/shopping-cart-service-java + mvn compile -nsu -Dakka-projection.version=`cat ~/.version` diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala index cc67529be..f69a39cf1 100644 --- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala +++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala @@ -22,12 +22,12 @@ import akka.persistence.typed.ReplicaId import akka.persistence.typed.crdt.LwwTime import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior -import akka.persistence.typed.scaladsl.ReplicationContext import akka.projection.grpc.TestContainerConf import akka.projection.grpc.TestDbLifecycle import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.replication import akka.projection.grpc.replication.scaladsl.Replica +import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors import akka.projection.grpc.replication.scaladsl.Replication import akka.projection.grpc.replication.scaladsl.ReplicationProjectionProvider import akka.projection.grpc.replication.scaladsl.ReplicationSettings @@ -98,27 +98,28 @@ object ReplicationIntegrationSpec { case class State(greeting: String, timestamp: LwwTime) - // FIXME needing to return/pass an EventSourcedBehavior means it is impossible to compose for logging/setup etc - def apply(replicationContext: ReplicationContext) = - EventSourcedBehavior[Command, Event, State]( - replicationContext.persistenceId, - State.initial, { - case (State(greeting, _), Get(replyTo)) => - replyTo ! greeting - Effect.none - case (state, SetGreeting(greeting, replyTo)) => - Effect - .persist( - GreetingChanged( - greeting, - state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) - .thenRun((_: State) => replyTo ! Done) - }, { - case (currentState, GreetingChanged(newGreeting, newTimestamp)) => - if (newTimestamp.isAfter(currentState.timestamp)) - State(newGreeting, newTimestamp) - else currentState - }) + def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) = + replicatedBehaviors.setup { replicationContext => + EventSourcedBehavior[Command, Event, State]( + replicationContext.persistenceId, + State.initial, { + case (State(greeting, _), Get(replyTo)) => + replyTo ! greeting + Effect.none + case (state, SetGreeting(greeting, replyTo)) => + Effect + .persist( + GreetingChanged( + greeting, + state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) + .thenRun((_: State) => replyTo ! Done) + }, { + case (currentState, GreetingChanged(newGreeting, newTimestamp)) => + if (newTimestamp.isAfter(currentState.timestamp)) + State(newGreeting, newTimestamp) + else currentState + }) + } } } diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala index 25b944172..8d22bd5bb 100644 --- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala +++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala @@ -30,6 +30,7 @@ import akka.projection.grpc.TestDbLifecycle import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.replication import akka.projection.grpc.replication.javadsl.Replica +import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors import akka.projection.grpc.replication.javadsl.Replication import akka.projection.grpc.replication.javadsl.ReplicationProjectionProvider import akka.projection.grpc.replication.javadsl.ReplicationSettings @@ -104,7 +105,8 @@ object ReplicationJavaDSLIntegrationSpec { case class State(greeting: String, timestamp: LwwTime) - def create(replicationContext: ReplicationContext) = new LWWHelloWorldBehavior(replicationContext) + def create(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) = + replicatedBehaviors.setup { replicationContext => new LWWHelloWorldBehavior(replicationContext) } class LWWHelloWorldBehavior(replicationContext: ReplicationContext) extends EventSourcedBehavior[Command, Event, State](replicationContext.persistenceId) { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicatedBehaviors.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicatedBehaviors.scala new file mode 100644 index 000000000..c92269d85 --- /dev/null +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicatedBehaviors.scala @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2009-2022 Lightbend Inc. + */ + +package akka.projection.grpc.replication.javadsl + +import akka.actor.typed.Behavior +import akka.annotation.ApiMayChange +import akka.japi.function.{ Function => JFunction } +import akka.persistence.typed.javadsl.EventSourcedBehavior +import akka.persistence.typed.javadsl.ReplicationContext + +/** + * Dynamically provides factory methods for creating replicated event sourced behaviors. + * + * Must be used to create an event sourced behavior to be replicated with [[Replication.grpcReplication]]. + * + * Can optionally be composed with other Behavior factories, to get access to actor context or timers. + */ +@ApiMayChange +abstract class ReplicatedBehaviors[Command, Event, State] { + def setup(factory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]]): Behavior[Command] +} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala index b973f2586..007efd83f 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala @@ -5,6 +5,7 @@ package akka.projection.grpc.replication.javadsl import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit import akka.cluster.sharding.typed.ReplicatedEntity @@ -17,7 +18,6 @@ import akka.http.javadsl.model.HttpResponse import akka.japi.function.{ Function => JFunction } import akka.persistence.typed.ReplicationId import akka.persistence.typed.internal.ReplicationContextImpl -import akka.persistence.typed.javadsl.EventSourcedBehavior import akka.persistence.typed.javadsl.ReplicationContext import akka.persistence.typed.scaladsl.ReplicatedEventSourcing import akka.projection.grpc.producer.javadsl.EventProducer @@ -74,7 +74,7 @@ object Replication { */ def grpcReplication[Command, Event, State]( settings: ReplicationSettings[Command], - replicatedBehaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]], + replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], system: ActorSystem[_]): Replication[Command] = { val scalaReplicationSettings = settings.toScala @@ -88,15 +88,17 @@ object Replication { settings.entityTypeKey, { entityContext: EntityContext[Command] => val replicationId = ReplicationId(entityContext.getEntityTypeKey.name, entityContext.getEntityId, settings.selfReplicaId) - ReplicatedEventSourcing.externalReplication( - replicationId, - scalaReplicationSettings.otherReplicas.map(_.replicaId) + settings.selfReplicaId)( - replicationContext => - replicatedBehaviorFactory - .apply(replicationContext.asInstanceOf[ReplicationContext]) - .createEventSourcedBehavior() - // MEH - .withReplication(replicationContext.asInstanceOf[ReplicationContextImpl])) + replicatedBehaviorFactory.apply( + factory => + ReplicatedEventSourcing.externalReplication( + replicationId, + scalaReplicationSettings.otherReplicas.map(_.replicaId) + settings.selfReplicaId)( + replicationContext => + factory + .apply(replicationContext.asInstanceOf[ReplicationContext]) + .createEventSourcedBehavior() + // MEH + .withReplication(replicationContext.asInstanceOf[ReplicationContextImpl]))) })) .toScala) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicatedBehaviors.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicatedBehaviors.scala new file mode 100644 index 000000000..657a3da6e --- /dev/null +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicatedBehaviors.scala @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2009-2022 Lightbend Inc. + */ + +package akka.projection.grpc.replication.scaladsl + +import akka.actor.typed.Behavior +import akka.annotation.ApiMayChange +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.ReplicationContext + +/** + * Dynamically provides factory methods for creating replicated event sourced behaviors. + * + * Must be used to create an event sourced behavior to be replicated with [[Replication.grpcReplication]]. + * + * Can optionally be composed with other Behavior factories, to get access to actor context or timers. + */ +@ApiMayChange +abstract class ReplicatedBehaviors[Command, Event, State] { + def setup(factory: ReplicationContext => EventSourcedBehavior[Command, Event, State]): Behavior[Command] +} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala index 7760c1165..456896288 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala @@ -5,6 +5,7 @@ package akka.projection.grpc.replication.scaladsl import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit import akka.cluster.sharding.typed.ReplicatedEntity @@ -14,9 +15,7 @@ import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse import akka.persistence.typed.ReplicationId -import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.ReplicatedEventSourcing -import akka.persistence.typed.scaladsl.ReplicationContext import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource import akka.projection.grpc.replication.internal.ReplicationImpl @@ -69,7 +68,7 @@ object Replication { * Important: Note that this does not publish the endpoint, additional steps are needed! */ def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command])( - replicatedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])( + replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( implicit system: ActorSystem[_]): Replication[Command] = { val replicatedEntity = @@ -78,9 +77,11 @@ object Replication { settings.configureEntity.apply(Entity(settings.entityTypeKey) { entityContext => val replicationId = ReplicationId(entityContext.entityTypeKey.name, entityContext.entityId, settings.selfReplicaId) - ReplicatedEventSourcing.externalReplication( - replicationId, - settings.otherReplicas.map(_.replicaId) + settings.selfReplicaId)(replicatedBehaviorFactory) + replicatedBehaviorFactory { factory => + ReplicatedEventSourcing.externalReplication( + replicationId, + settings.otherReplicas.map(_.replicaId) + settings.selfReplicaId)(factory) + } })) ReplicationImpl.grpcReplication[Command, Event, State](settings, replicatedEntity) diff --git a/akka-projection-grpc/src/test/java/akka/projection/grpc/replication/javdsl/ReplicationCompileTest.java b/akka-projection-grpc/src/test/java/akka/projection/grpc/replication/javdsl/ReplicationCompileTest.java index 1f0c9a2b7..e512f26b4 100644 --- a/akka-projection-grpc/src/test/java/akka/projection/grpc/replication/javdsl/ReplicationCompileTest.java +++ b/akka-projection-grpc/src/test/java/akka/projection/grpc/replication/javdsl/ReplicationCompileTest.java @@ -7,6 +7,7 @@ import akka.Done; import akka.NotUsed; import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; import akka.grpc.GrpcClientSettings; import akka.grpc.javadsl.ServiceHandler; import akka.http.javadsl.Http; @@ -17,12 +18,11 @@ import akka.persistence.query.Offset; import akka.persistence.query.typed.EventEnvelope; import akka.persistence.typed.ReplicaId; -import akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior; -import akka.persistence.typed.javadsl.ReplicationContext; import akka.projection.ProjectionContext; import akka.projection.ProjectionId; import akka.projection.grpc.producer.EventProducerSettings; import akka.projection.grpc.replication.javadsl.Replica; +import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors; import akka.projection.grpc.replication.javadsl.Replication; import akka.projection.grpc.replication.javadsl.ReplicationProjectionProvider; import akka.projection.grpc.replication.javadsl.ReplicationSettings; @@ -40,11 +40,15 @@ public class ReplicationCompileTest { - interface MyCommand {} + interface MyCommand {} - static ReplicatedEventSourcedBehavior create(ReplicationContext context) { - throw new UnsupportedOperationException("just a dummy factory method"); - } + static Behavior create( + ReplicatedBehaviors replicatedBehaviors) { + return replicatedBehaviors.setup( + replicationContext -> { + throw new UnsupportedOperationException("just a dummy factory method"); + }); + } public static void start(ActorSystem system) { Set otherReplicas = new HashSet<>(); diff --git a/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md b/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md index f9955a906..3db8d5c91 100644 --- a/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md +++ b/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md @@ -29,13 +29,13 @@ For a basic overview of Replicated Event Sourcing see the @extref:[Akka Replicat Akka Replicated Event Sourcing over gRPC consists of the following three parts: -The Replicated Event Sourced Behavior is run in each replica as a sharded entity using @extref:[Akka Cluster Sharding](akka:typed/cluster-sharding.html). +* The Replicated Event Sourced Behavior is run in each replica as a sharded entity using @extref:[Akka Cluster + Sharding](akka:typed/cluster-sharding.html). -The events of each replica are published to the other replicas using @ref:[Akka Projection gRPC](grpc.md) endpoints. - -Each replica consumes a number of parallel slices of the events from each other replica by running Akka Projection gRPC -in @extref:[Akka Sharded Daemon Process](akka:typed/cluster-sharded-daemon-process.html). +* The events of each replica are published to the other replicas using @ref:[Akka Projection gRPC](grpc.md) endpoints. +* Each replica consumes a number of parallel slices of the events from each other replica by running Akka Projection + gRPC in @extref:[Akka Sharded Daemon Process](akka:typed/cluster-sharded-daemon-process.html). ## Dependencies @@ -46,6 +46,12 @@ The functionality is provided through the `akka-projection-grpc` module. To use the gRPC module of Akka Projections add the following dependency in your project: +@@dependency [sbt,Maven,Gradle] { + group=com.lightbend.akka + artifact=akka-projection-grpc_$scala.binary.version$ + version=$project.version$ +} + Akka Replicated Event Sourcing over gRPC requires Akka 2.8.0 or later and can only be run in an Akka cluster since it uses cluster components. It is currently only possible to use @extref:[akka-persistence-r2dbc](akka-persistence-r2dbc:projection.html) as the @@ -77,7 +83,23 @@ The table below shows `akka-projection-grpc`'s direct dependencies, and the seco @@dependencies{ projectId="akka-projection-grpc" } +## API and setup + +The same API as regular `EventSourcedBehavior`s is used to define the logic. See @extref:[Replicated Event Sourcing](akka:typed/replicated-eventsourcing.html) for more detail on designing an entity for replication. +To enable an entity for Replicated Event Sourcing over gRPC, use the @apidoc[Replication$] `grpcReplication` method, +which takes @apidoc[ReplicationSettings], a factory function for the behavior, and an actor system. + +The factory function will be passed a @apidoc[ReplicatedBehaviors] factory that must be used to set up the replicated +event sourced behavior. Its `setup` method provides a @apidoc[ReplicationContext] to create an `EventSourcedBehavior` +which will then be configured for replication. The behavior factory can be composed with other behavior factories, if +access to the actor context or timers are needed. + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init } ### Settings @@ -106,10 +128,22 @@ Binding the publisher is a manual step to allow arbitrary customization of the A with other HTTP and gRPC routes. When there is only a single replicated entity and no other usage of Akka gRPC Projections in an application a -convenience is provided through `createSingleServiceHandler` on @apidoc[akka.projection.grpc.replication.*.Replication] which -will create a single handler, this can then be bound: +convenience is provided through `createSingleServiceHandler` on @apidoc[akka.projection.grpc.replication.*.Replication] +which will create a single handler: -FIXME sample snippet +Scala +: @@snip [Main.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala) { #single-service-handler } + +Java +: @@snip [Main.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java) { #single-service-handler } + +This can then be bound: + +Scala +: @@snip [ShoppingCartServer.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServer.scala) { #bind } + +Java +: @@snip [ShoppingCartServer.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServer.java) { #bind } When multiple producers exist, all instances of @apidoc[akka.projection.grpc.producer.EventProducerSettings] need to be passed at once to `EventProducer.grpcServiceHandler` to create a single producer service handling each of the event @@ -137,9 +171,12 @@ new versions of the application to the replicas. FIXME something more here - serialization can fail and stop the replication, but it could also silently lose data in new fields before the consuming side has a new version. -### Sample projects +## Sample projects + +Source code and build files for complete sample projects can be found in the `akka/akka-projection` GitHub repository. -FIXME should we have one? The shopping cart? +* [Replicated shopping cart service in Scala](https://github.com/akka/akka-projection/tree/main/samples/replicated/shopping-cart-service-scala) +* [Replicated shopping cart service in Java](https://github.com/akka/akka-projection/tree/main/samples/replicated/shopping-cart-service-java) ## Access control @@ -151,4 +188,4 @@ The consumer can pass metadata, such as auth headers, in each request to the pro Authentication and authorization for the producer can be done by implementing an @apidoc[EventProducerInterceptor] and passing it to the `grpcServiceHandler` method during producer bootstrap. The interceptor is invoked with the stream id and -gRPC request metadata for each incoming request and can return a suitable error through @apidoc[GrpcServiceException] \ No newline at end of file +gRPC request metadata for each incoming request and can return a suitable error through @apidoc[GrpcServiceException] diff --git a/docs/src/main/paradox/grpc.md b/docs/src/main/paradox/grpc.md index cccbec9df..e784bacc4 100644 --- a/docs/src/main/paradox/grpc.md +++ b/docs/src/main/paradox/grpc.md @@ -139,12 +139,12 @@ but it illustrates how to combine the `EventProducer` service with other gRPC se Source code and build files for complete sample projects can be found in `akka/akka-projection` GitHub repository. -Scala: +Java: * [Producer service: shopping-cart-service-java](https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-cart-service-java) * [Consumer service: shopping-analytics-service-java](https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-analytics-service-java) -Java: +Scala: * [Producer service: shopping-cart-service-scala](https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-cart-service-scala) * [Consumer service: shopping-analytics-service-scala](https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-analytics-service-scala) diff --git a/samples/replicated/shopping-cart-service-java/.gitignore b/samples/replicated/shopping-cart-service-java/.gitignore new file mode 100644 index 000000000..6751d0d28 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/.gitignore @@ -0,0 +1,16 @@ +target/ +.bsp/ + +.settings +.project +.classpath + +.idea +*.iml + +.DS_Store + +.metals +.vscode +project/bloop +project/metals.sbt diff --git a/samples/replicated/shopping-cart-service-java/LICENSE b/samples/replicated/shopping-cart-service-java/LICENSE new file mode 100644 index 000000000..4239f09e0 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/LICENSE @@ -0,0 +1,10 @@ +Akka sample by Lightbend + +Licensed under Public Domain (CC0) + +To the extent possible under law, the person who associated CC0 with +this Template has waived all copyright and related or neighboring +rights to this Template. + +You should have received a copy of the CC0 legalcode along with this +work. If not, see . diff --git a/samples/replicated/shopping-cart-service-java/README.md b/samples/replicated/shopping-cart-service-java/README.md new file mode 100644 index 000000000..aba95ba5b --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/README.md @@ -0,0 +1,79 @@ +## Running the sample code + +1. Start two local PostgresSQL servers, on ports 5101 and 5201. The included `docker-compose.yml` starts everything required for running locally. + + ```shell + docker-compose up -d + + # creates the tables needed for Akka Persistence + # as well as the offset store table for Akka Projection + docker exec -i postgres_db_1 psql -U postgres -t < ddl-scripts/create_tables.sql + docker exec -i postgres_db_2 psql -U postgres -t < ddl-scripts/create_tables.sql + ``` + +2. Make sure you have compiled the project + + ```shell + mvn compile + ``` + +3. Start a first node for each replica: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica1-local1.conf + ``` + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica2-local1.conf + ``` + +4. (Optional) Start another node with different ports: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica1-local2.conf + ``` + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica2-local2.conf + ``` + +5. (Optional) More can be started: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica1-local3.conf + ``` + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica2-local3.conf + ``` + +6. Check for service readiness + + ```shell + curl http://localhost:9101/ready + ``` + + ```shell + curl http://localhost:9201/ready + ``` + +7. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl): + + ```shell + # add item to cart on the first replica + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":7}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem + + # get cart from first replica + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart + + # get cart from second replica + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8201 shoppingcart.ShoppingCartService.GetCart + + # update quantity of item on the second replica + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":2}' -plaintext 127.0.0.1:8201 shoppingcart.ShoppingCartService.RemoveItem + + # check out cart + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout + ``` + + or same `grpcurl` commands to port 8102/8202 to reach node 2. diff --git a/samples/replicated/shopping-cart-service-java/ddl-scripts/create_tables.sql b/samples/replicated/shopping-cart-service-java/ddl-scripts/create_tables.sql new file mode 100644 index 000000000..18496075d --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/ddl-scripts/create_tables.sql @@ -0,0 +1,94 @@ +CREATE TABLE IF NOT EXISTS event_journal( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + db_timestamp timestamp with time zone NOT NULL, + + event_ser_id INTEGER NOT NULL, + event_ser_manifest VARCHAR(255) NOT NULL, + event_payload BYTEA NOT NULL, + + deleted BOOLEAN DEFAULT FALSE NOT NULL, + writer VARCHAR(255) NOT NULL, + adapter_manifest VARCHAR(255), + tags TEXT ARRAY, + + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id, seq_nr) +); + +CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, entity_type, db_timestamp, seq_nr); + +CREATE TABLE IF NOT EXISTS snapshot( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + write_timestamp BIGINT NOT NULL, + ser_id INTEGER NOT NULL, + ser_manifest VARCHAR(255) NOT NULL, + snapshot BYTEA NOT NULL, + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id) +); + +CREATE TABLE IF NOT EXISTS durable_state ( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + revision BIGINT NOT NULL, + db_timestamp timestamp with time zone NOT NULL, + + state_ser_id INTEGER NOT NULL, + state_ser_manifest VARCHAR(255), + state_payload BYTEA NOT NULL, + tags TEXT ARRAY, + + PRIMARY KEY(persistence_id, revision) +); + +-- `durable_state_slice_idx` is only needed if the slice based queries are used +CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision); + +-- Primitive offset types are stored in this table. +-- If only timestamp based offsets are used this table is optional. +-- Configure akka.projection.r2dbc.offset-store.offset-table="" if the table is not created. +CREATE TABLE IF NOT EXISTS akka_projection_offset_store ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + current_offset VARCHAR(255) NOT NULL, + manifest VARCHAR(32) NOT NULL, + mergeable BOOLEAN NOT NULL, + last_updated BIGINT NOT NULL, + PRIMARY KEY(projection_name, projection_key) +); + +-- Timestamp based offsets are stored in this table. +CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + slice INT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + -- timestamp_offset is the db_timestamp of the original event + timestamp_offset timestamp with time zone NOT NULL, + -- timestamp_consumed is when the offset was stored + -- the consumer lag is timestamp_consumed - timestamp_offset + timestamp_consumed timestamp with time zone NOT NULL, + PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) +); + +CREATE TABLE IF NOT EXISTS akka_projection_management ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + paused BOOLEAN NOT NULL, + last_updated BIGINT NOT NULL, + PRIMARY KEY(projection_name, projection_key) +); \ No newline at end of file diff --git a/samples/replicated/shopping-cart-service-java/docker-compose.yml b/samples/replicated/shopping-cart-service-java/docker-compose.yml new file mode 100644 index 000000000..bf9c444bc --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/docker-compose.yml @@ -0,0 +1,18 @@ +version: '2.2' +services: + postgres-db-1: + image: postgres:latest + container_name: postgres_db_1 + ports: + - 5101:5432 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + postgres-db-2: + image: postgres:latest + container_name: postgres_db_2 + ports: + - 5201:5432 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres diff --git a/samples/replicated/shopping-cart-service-java/pom.xml b/samples/replicated/shopping-cart-service-java/pom.xml new file mode 100644 index 000000000..29f0ec770 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/pom.xml @@ -0,0 +1,272 @@ + + + + 4.0.0 + shopping-cart-service + com.lightbend.akka.samples + 1.0 + + + + Public Domain (CC0) + http://creativecommons.org/publicdomain/zero/1.0/ + + + + + UTF-8 + 2.2.1 + 2.13 + + application.conf + + ${git.commit.time}-${git.commit.id.abbrev} + + 1.3.1 + 1.1.0-M4 + + + + + + + com.typesafe.akka + akka-bom_${scala.binary.version} + 2.8.0-M4 + pom + import + + + com.lightbend.akka + akka-dependencies_${scala.binary.version} + 22.10.4 + pom + import + + + + + + + com.typesafe.akka + akka-cluster-typed_${scala.binary.version} + + + com.typesafe.akka + akka-cluster-sharding-typed_${scala.binary.version} + + + com.typesafe.akka + akka-persistence-typed_${scala.binary.version} + + + com.typesafe.akka + akka-persistence-query_${scala.binary.version} + + + com.typesafe.akka + akka-serialization-jackson_${scala.binary.version} + + + com.typesafe.akka + akka-cluster-tools_${scala.binary.version} + + + com.lightbend.akka + akka-persistence-r2dbc_${scala.binary.version} + ${akka-persistence-r2dbc.version} + + + com.lightbend.akka + akka-projection-r2dbc_${scala.binary.version} + ${akka-persistence-r2dbc.version} + + + com.lightbend.akka + akka-projection-core_${scala.binary.version} + ${akka-projection.version} + + + com.lightbend.akka + akka-projection-grpc_${scala.binary.version} + ${akka-projection.version} + + + com.lightbend.akka + akka-projection-eventsourced_${scala.binary.version} + ${akka-projection.version} + + + com.lightbend.akka.management + akka-management-cluster-http_${scala.binary.version} + + + com.lightbend.akka.management + akka-management-cluster-bootstrap_${scala.binary.version} + + + com.lightbend.akka.grpc + akka-grpc-runtime_${scala.binary.version} + + + + ch.qos.logback + logback-classic + 1.2.11 + + + + com.typesafe.akka + akka-actor-testkit-typed_${scala.binary.version} + test + + + com.typesafe.akka + akka-persistence-testkit_${scala.binary.version} + test + + + com.lightbend.akka + akka-projection-testkit_${scala.binary.version} + ${akka-projection.version} + test + + + com.typesafe.akka + akka-stream-testkit_${scala.binary.version} + test + + + + junit + junit + 4.13.1 + test + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + 11 + + -Xlint:unchecked + -Xlint:deprecation + -parameters + + + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + + java + + -Djava.library.path=target/lib + -Dconfig.resource=${APP_CONFIG} + -classpath + + shopping.cart.Main + + + + + + com.lightbend.akka.grpc + akka-grpc-maven-plugin + ${akka-grpc-maven-plugin.version} + + Java + + + + + generate + + + + + + com.diffplug.spotless + spotless-maven-plugin + 2.4.1 + + + + 1.8 + + + + + + + pl.project13.maven + git-commit-id-plugin + 4.0.0 + + + validate + + revision + + + + + yyyyMMdd-HHmmss + + ${project.basedir}/.git + false + false + + + + io.fabric8 + docker-maven-plugin + 0.35.0 + + + + %a + + docker.io/library/adoptopenjdk:11-jre-hotspot + + ${version.number} + + + + java + -cp + /maven/* + shopping.cart.Main + + + + artifact-with-dependencies + + + + + + + + build-docker-image + package + + build + + + + + + + diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/CborSerializable.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/CborSerializable.java new file mode 100644 index 000000000..e07fe714c --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/CborSerializable.java @@ -0,0 +1,7 @@ +package shopping.cart; + +/** + * Marker trait for serialization with Jackson CBOR. Enabled in serialization.conf + * `akka.actor.serialization-bindings` (via application.conf). + */ +public interface CborSerializable {} diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java new file mode 100644 index 000000000..5d1fd9ce2 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java @@ -0,0 +1,50 @@ +package shopping.cart; + +import akka.actor.typed.ActorSystem; +import akka.actor.typed.javadsl.Behaviors; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; +import akka.japi.function.Function; +import akka.management.cluster.bootstrap.ClusterBootstrap; +import akka.management.javadsl.AkkaManagement; +import akka.projection.grpc.replication.javadsl.Replication; +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import shopping.cart.proto.ShoppingCartService; + +import java.util.concurrent.CompletionStage; + +public class Main { + + private static final Logger logger = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) { + ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShoppingCartService"); + try { + init(system); + } catch (Exception e) { + logger.error("Terminating due to initialization failure.", e); + system.terminate(); + } + } + + public static void init(ActorSystem system) { + AkkaManagement.get(system).start(); + ClusterBootstrap.get(system).start(); + + // #single-service-handler + Replication replicatedShoppingCart = ShoppingCart.init(system); + Function> replicationService = + replicatedShoppingCart.createSingleServiceHandler(); + // #single-service-handler + + Config config = system.settings().config(); + String grpcInterface = config.getString("shopping-cart-service.grpc.interface"); + int grpcPort = config.getInt("shopping-cart-service.grpc.port"); + ShoppingCartService grpcService = + new ShoppingCartServiceImpl(system, replicatedShoppingCart.entityTypeKey()); + ShoppingCartServer.start(grpcInterface, grpcPort, system, grpcService, replicationService); + } + +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java new file mode 100644 index 000000000..53345a6d3 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java @@ -0,0 +1,395 @@ +package shopping.cart; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.SupervisorStrategy; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.pattern.StatusReply; +import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.javadsl.*; +import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors; +import akka.projection.grpc.replication.javadsl.Replication; +import akka.projection.grpc.replication.javadsl.ReplicationProjectionProvider; +import akka.projection.grpc.replication.javadsl.ReplicationSettings; +import akka.projection.r2dbc.javadsl.R2dbcProjection; +import com.fasterxml.jackson.annotation.JsonCreator; +import java.time.Duration; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; + +/** + * This is an event sourced actor (`EventSourcedBehavior`). An entity managed by Cluster Sharding. + * + *

It has a state, [[ShoppingCart.State]], which holds the current shopping cart items and + * whether it's checked out. + * + *

You interact with event sourced actors by sending commands to them, see classes implementing + * [[ShoppingCart.Command]]. + * + *

The command handler validates and translates commands to events, see classes implementing + * [[ShoppingCart.Event]]. It's the events that are persisted by the `EventSourcedBehavior`. The + * event handler updates the current state based on the event. This is done when the event is first + * created, and when the entity is loaded from the database - each event will be replayed to + * recreate the state of the entity. + * + *

This shopping cart is replicated using Replicated Event Sourcing. Multiple entity instances + * can be active at the same time, so the state must be convergent, and each cart item is modelled + * as a counter. When checking out the cart, only one of the replicas performs the actual checkout, + * once it's seen that all replicas have closed this cart which will be after all item updated + * events have been replicated. + */ +public final class ShoppingCart + extends EventSourcedBehaviorWithEnforcedReplies< + ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { + + /** The current state held by the `EventSourcedBehavior`. */ + static final class State implements CborSerializable { + final Map items; + final Set closed; + private Optional checkedOut; + + public State() { + this(new HashMap<>(), new HashSet<>(), Optional.empty()); + } + + public State(Map items, Set closed, Optional checkedOut) { + this.items = items; + this.closed = closed; + this.checkedOut = checkedOut; + } + + public boolean isClosed() { + return !closed.isEmpty(); + } + + public State updateItem(String itemId, int quantity) { + items.put(itemId, items.getOrDefault(itemId, 0) + quantity); + return this; + } + + public State close(ReplicaId replica) { + closed.add(replica); + return this; + } + + public State checkout(Instant now) { + checkedOut = Optional.of(now); + return this; + } + + public Summary toSummary() { + return new Summary(items, isClosed()); + } + } + + /** This interface defines all the commands (messages) that the ShoppingCart actor supports. */ + interface Command extends CborSerializable {} + + /** + * A command to add an item to the cart. + * + *

It replies with `StatusReply<Summary>`, which is sent back to the caller when all the + * events emitted by this command are successfully persisted. + */ + public static final class AddItem implements Command { + final String itemId; + final int quantity; + final ActorRef> replyTo; + + public AddItem(String itemId, int quantity, ActorRef> replyTo) { + this.itemId = itemId; + this.quantity = quantity; + this.replyTo = replyTo; + } + } + + /** A command to remove an item from the cart. */ + public static final class RemoveItem implements Command { + final String itemId; + final int quantity; + final ActorRef> replyTo; + + public RemoveItem(String itemId, int quantity, ActorRef> replyTo) { + this.itemId = itemId; + this.quantity = quantity; + this.replyTo = replyTo; + } + } + + /** A command to check out the shopping cart. */ + public static final class Checkout implements Command { + final ActorRef> replyTo; + + @JsonCreator + public Checkout(ActorRef> replyTo) { + this.replyTo = replyTo; + } + } + + /** Internal command to close a shopping cart that's being checked out. */ + public enum CloseForCheckout implements Command { + INSTANCE + } + + /** Internal command to complete the checkout for a shopping cart. */ + public enum CompleteCheckout implements Command { + INSTANCE + } + + /** A command to get the current state of the shopping cart. */ + public static final class Get implements Command { + final ActorRef

replyTo; + + @JsonCreator + public Get(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + /** Summary of the shopping cart state, used in reply messages. */ + public static final class Summary implements CborSerializable { + final Map items; + final boolean checkedOut; + + public Summary(Map items, boolean checkedOut) { + // defensive copy since items is a mutable object + this.items = new HashMap<>(items); + this.items.values().removeIf(count -> count <= 0); + this.checkedOut = checkedOut; + } + } + + abstract static class Event implements CborSerializable {} + + static final class ItemUpdated extends Event { + public final String itemId; + public final int quantity; + + public ItemUpdated(String itemId, int quantity) { + this.itemId = itemId; + this.quantity = quantity; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ItemUpdated other = (ItemUpdated) o; + + if (quantity != other.quantity) return false; + return itemId.equals(other.itemId); + } + + @Override + public int hashCode() { + int result = itemId.hashCode(); + result = 31 * result + quantity; + return result; + } + } + + static final class Closed extends Event { + final ReplicaId replica; + + @JsonCreator + public Closed(ReplicaId replica) { + this.replica = replica; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Closed that = (Closed) o; + return Objects.equals(replica, that.replica); + } + + @Override + public int hashCode() { + return Objects.hash(replica); + } + } + + static final class CheckedOut extends Event { + final Instant eventTime; + + @JsonCreator + public CheckedOut(Instant eventTime) { + this.eventTime = eventTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CheckedOut that = (CheckedOut) o; + return Objects.equals(eventTime, that.eventTime); + } + + @Override + public int hashCode() { + return Objects.hash(eventTime); + } + } + + // #init + public static Replication init(ActorSystem system) { + ReplicationProjectionProvider projectionProvider = + (projectionId, sourceProvider, replicationFlow, actorSystem) -> + R2dbcProjection.atLeastOnceFlow( + projectionId, Optional.empty(), sourceProvider, replicationFlow, actorSystem); + ReplicationSettings replicationSettings = + ReplicationSettings.create( + Command.class, "replicated-shopping-cart", projectionProvider, system); + return Replication.grpcReplication(replicationSettings, ShoppingCart::create, system); + } + + public static Behavior create( + ReplicatedBehaviors replicatedBehaviors) { + return Behaviors.setup( + context -> + replicatedBehaviors.setup( + replicationContext -> new ShoppingCart(context, replicationContext))); + } + // #init + + private final ActorContext context; + private final ReplicationContext replicationContext; + private final boolean isLeader; + + private ShoppingCart(ActorContext context, ReplicationContext replicationContext) { + super( + replicationContext.persistenceId(), + SupervisorStrategy.restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1)); + this.context = context; + this.replicationContext = replicationContext; + this.isLeader = isShoppingCartLeader(replicationContext); + } + + // one replica is responsible for checking out the shopping cart, once all replicas have closed + private static boolean isShoppingCartLeader(ReplicationContext replicationContext) { + List orderedReplicas = + replicationContext.getAllReplicas().stream() + .sorted(Comparator.comparing(ReplicaId::id)) + .collect(Collectors.toList()); + int leaderIndex = Math.abs(replicationContext.entityId().hashCode() % orderedReplicas.size()); + return orderedReplicas.get(leaderIndex) == replicationContext.replicaId(); + } + + @Override + public RetentionCriteria retentionCriteria() { + return RetentionCriteria.snapshotEvery(100, 3); + } + + @Override + public State emptyState() { + return new State(); + } + + @Override + public CommandHandlerWithReply commandHandler() { + return openShoppingCart().orElse(closedShoppingCart()).orElse(getCommandHandler()).build(); + } + + private CommandHandlerWithReplyBuilderByState openShoppingCart() { + return newCommandHandlerWithReplyBuilder() + .forState(state -> !state.isClosed()) + .onCommand(AddItem.class, this::openOnAddItem) + .onCommand(RemoveItem.class, this::openOnRemoveItem) + .onCommand(Checkout.class, this::openOnCheckout); + } + + private ReplyEffect openOnAddItem(State state, AddItem cmd) { + return Effect() + .persist(new ItemUpdated(cmd.itemId, cmd.quantity)) + .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); + } + + private ReplyEffect openOnRemoveItem(State state, RemoveItem cmd) { + return Effect() + .persist(new ItemUpdated(cmd.itemId, -cmd.quantity)) + .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); + } + + private ReplyEffect openOnCheckout(State state, Checkout cmd) { + return Effect() + .persist(new Closed(replicationContext.replicaId())) + .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); + } + + private CommandHandlerWithReplyBuilderByState closedShoppingCart() { + return newCommandHandlerWithReplyBuilder() + .forState(State::isClosed) + .onCommand(AddItem.class, this::closedOnAddItem) + .onCommand(RemoveItem.class, this::closedOnRemoveItem) + .onCommand(Checkout.class, this::closedOnCheckout) + .onCommand(CloseForCheckout.class, this::closedOnCloseForCheckout) + .onCommand(CompleteCheckout.class, this::closedOnCompleteCheckout); + } + + private ReplyEffect closedOnAddItem(State state, AddItem cmd) { + return Effect() + .reply( + cmd.replyTo, + StatusReply.error("Can't add an item to an already checked out shopping cart")); + } + + private ReplyEffect closedOnRemoveItem(State state, RemoveItem cmd) { + return Effect() + .reply( + cmd.replyTo, + StatusReply.error("Can't remove an item from an already checked out shopping cart")); + } + + private ReplyEffect closedOnCheckout(State state, Checkout cmd) { + return Effect() + .reply(cmd.replyTo, StatusReply.error("Can't checkout already checked out shopping cart")); + } + + private ReplyEffect closedOnCloseForCheckout(State state, CloseForCheckout cmd) { + return Effect().persist(new Closed(replicationContext.replicaId())).thenNoReply(); + } + + private ReplyEffect closedOnCompleteCheckout(State state, CompleteCheckout cmd) { + // TODO: trigger other effects from shopping cart checkout + return Effect().persist(new CheckedOut(Instant.now())).thenNoReply(); + } + + private CommandHandlerWithReplyBuilderByState getCommandHandler() { + return newCommandHandlerWithReplyBuilder() + .forAnyState() + .onCommand(Get.class, (state, cmd) -> Effect().reply(cmd.replyTo, state.toSummary())); + } + + @Override + public EventHandler eventHandler() { + return newEventHandlerBuilder() + .forAnyState() + .onEvent( + ItemUpdated.class, (state, event) -> state.updateItem(event.itemId, event.quantity)) + .onEvent( + Closed.class, + (state, event) -> { + State newState = state.close(event.replica); + eventTriggers(newState); + return newState; + }) + .onEvent(CheckedOut.class, (state, event) -> state.checkout(event.eventTime)) + .build(); + } + + private void eventTriggers(State state) { + if (!replicationContext.recoveryRunning()) { + if (!state.closed.contains(replicationContext.replicaId())) { + context.getSelf().tell(CloseForCheckout.INSTANCE); + } else if (isLeader) { + boolean allClosed = replicationContext.getAllReplicas().equals(state.closed); + if (allClosed) context.getSelf().tell(CompleteCheckout.INSTANCE); + } + } + } +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServer.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServer.java new file mode 100644 index 000000000..0f09ea7e0 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServer.java @@ -0,0 +1,59 @@ +package shopping.cart; + +import akka.actor.typed.ActorSystem; +import akka.grpc.javadsl.ServerReflection; +import akka.grpc.javadsl.ServiceHandler; +import akka.http.javadsl.Http; +import akka.http.javadsl.ServerBinding; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; +import akka.japi.function.Function; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CompletionStage; +import shopping.cart.proto.ShoppingCartService; +import shopping.cart.proto.ShoppingCartServiceHandlerFactory; + +public final class ShoppingCartServer { + + private ShoppingCartServer() {} + + static void start( + String host, + int port, + ActorSystem system, + ShoppingCartService grpcService, + Function> replicationService) { + @SuppressWarnings("unchecked") + // #bind + Function> service = + ServiceHandler.concatOrNotFound( + replicationService, + ShoppingCartServiceHandlerFactory.create(grpcService, system), + // ServerReflection enabled to support grpcurl without import-path and proto parameters + ServerReflection.create( + Collections.singletonList(ShoppingCartService.description), system)); + + CompletionStage bound = + Http.get(system).newServerAt(host, port).bind(service::apply); + // #bind + + bound.whenComplete( + (binding, ex) -> { + if (binding != null) { + binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system); + InetSocketAddress address = binding.localAddress(); + system + .log() + .info( + "Shopping online at gRPC server {}:{}", + address.getHostString(), + address.getPort()); + } else { + system.log().error("Failed to bind gRPC endpoint, terminating system", ex); + system.terminate(); + } + }); + } +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java new file mode 100644 index 000000000..d3dbf50c3 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java @@ -0,0 +1,120 @@ +package shopping.cart; + +import akka.actor.typed.ActorSystem; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.EntityRef; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.grpc.GrpcServiceException; +import io.grpc.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import shopping.cart.proto.AddItemRequest; +import shopping.cart.proto.Cart; +import shopping.cart.proto.CheckoutRequest; +import shopping.cart.proto.GetCartRequest; +import shopping.cart.proto.Item; +import shopping.cart.proto.RemoveItemRequest; +import shopping.cart.proto.ShoppingCartService; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public final class ShoppingCartServiceImpl implements ShoppingCartService { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private final EntityTypeKey entityKey; + private final Duration timeout; + private final ClusterSharding sharding; + + public ShoppingCartServiceImpl( + ActorSystem system, EntityTypeKey entityKey) { + + this.entityKey = entityKey; + timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout"); + sharding = ClusterSharding.get(system); + } + + @Override + public CompletionStage addItem(AddItemRequest in) { + logger.info( + "addItem {} quantity {} to cart {}", in.getItemId(), in.getQuantity(), in.getCartId()); + EntityRef entityRef = sharding.entityRefFor(entityKey, in.getCartId()); + CompletionStage reply = + entityRef.askWithStatus( + replyTo -> new ShoppingCart.AddItem(in.getItemId(), in.getQuantity(), replyTo), + timeout); + CompletionStage cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart); + return convertError(cart); + } + + @Override + public CompletionStage removeItem(RemoveItemRequest in) { + logger.info( + "removeItem {} quantity {} from cart {}", in.getItemId(), in.getQuantity(), in.getCartId()); + EntityRef entityRef = sharding.entityRefFor(entityKey, in.getCartId()); + CompletionStage reply = + entityRef.askWithStatus( + replyTo -> new ShoppingCart.RemoveItem(in.getItemId(), in.getQuantity(), replyTo), + timeout); + CompletionStage cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart); + return convertError(cart); + } + + @Override + public CompletionStage checkout(CheckoutRequest in) { + logger.info("checkout {}", in.getCartId()); + EntityRef entityRef = sharding.entityRefFor(entityKey, in.getCartId()); + CompletionStage reply = + entityRef.askWithStatus(replyTo -> new ShoppingCart.Checkout(replyTo), timeout); + CompletionStage cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart); + return convertError(cart); + } + + @Override + public CompletionStage getCart(GetCartRequest in) { + logger.info("getCart {}", in.getCartId()); + EntityRef entityRef = sharding.entityRefFor(entityKey, in.getCartId()); + CompletionStage reply = + entityRef.ask(replyTo -> new ShoppingCart.Get(replyTo), timeout); + CompletionStage protoCart = + reply.thenApply( + cart -> { + if (cart.items.isEmpty()) + throw new GrpcServiceException( + Status.NOT_FOUND.withDescription("Cart " + in.getCartId() + " not found")); + else return toProtoCart(cart); + }); + return convertError(protoCart); + } + + private static Cart toProtoCart(ShoppingCart.Summary cart) { + List protoItems = + cart.items.entrySet().stream() + .map( + entry -> + Item.newBuilder() + .setItemId(entry.getKey()) + .setQuantity(entry.getValue()) + .build()) + .collect(Collectors.toList()); + + return Cart.newBuilder().setCheckedOut(cart.checkedOut).addAllItems(protoItems).build(); + } + + private static CompletionStage convertError(CompletionStage response) { + return response.exceptionally( + ex -> { + if (ex instanceof TimeoutException) { + throw new GrpcServiceException( + Status.UNAVAILABLE.withDescription("Operation timed out")); + } else { + throw new GrpcServiceException( + Status.INVALID_ARGUMENT.withDescription(ex.getMessage())); + } + }); + } +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto b/samples/replicated/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto new file mode 100644 index 000000000..65eed87bd --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "shopping.cart.proto"; + +package shoppingcart; + +// gRPC definition for ShoppingCartService + +service ShoppingCartService { + rpc AddItem (AddItemRequest) returns (Cart) {} + rpc RemoveItem (RemoveItemRequest) returns (Cart) {} + rpc Checkout (CheckoutRequest) returns (Cart) {} + rpc GetCart (GetCartRequest) returns (Cart) {} +} + +message AddItemRequest { + string cartId = 1; + string itemId = 2; + int32 quantity = 3; +} + +message RemoveItemRequest { + string cartId = 1; + string itemId = 2; + int32 quantity = 3; +} + +message CheckoutRequest { + string cartId = 1; +} + +message GetCartRequest { + string cartId = 1; +} + +message Cart { + repeated Item items = 1; + bool checkedOut = 2; +} + +message Item { + string itemId = 1; + int32 quantity = 2; +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/application.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/application.conf new file mode 100644 index 000000000..3fa260404 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/application.conf @@ -0,0 +1,12 @@ +include "cluster" +include "grpc" +include "serialization" +include "persistence" + +akka { + loglevel = DEBUG +} + +shopping-cart-service { + ask-timeout = 5 s +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/cluster.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/cluster.conf new file mode 100644 index 000000000..4cc1f8559 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/cluster.conf @@ -0,0 +1,27 @@ +akka { + actor.provider = cluster + + remote.artery { + canonical.port = 2551 + } + + cluster { + downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + + shutdown-after-unsuccessful-join-seed-nodes = 120s + + sharding { + least-shard-allocation-strategy.rebalance-absolute-limit = 20 + passivation.strategy = default-strategy + } + } +} + +akka.management { + http { + port = 8558 + port = ${?HTTP_MGMT_PORT} + } + # actual bootstrap/discovery config would go here + # for running locally see local-shared.conf +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/grpc.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/grpc.conf new file mode 100644 index 000000000..15b34d629 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/grpc.conf @@ -0,0 +1,11 @@ +akka.http.server.preview.enable-http2 = on + +shopping-cart-service { + + grpc { + # consider setting this to a specific interface for your environment + interface = "0.0.0.0" + port = 8101 + port = ${?GRPC_PORT} + } +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/local-shared.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/local-shared.conf new file mode 100644 index 000000000..307c8751d --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/local-shared.conf @@ -0,0 +1,18 @@ +include "replication" + +shopping-cart-service.grpc.interface = "127.0.0.1" +akka.remote.artery.canonical.hostname = "127.0.0.1" +akka.management.http.hostname = "127.0.0.1" + +akka.management.cluster.bootstrap.contact-point-discovery { + service-name = "shopping-cart-service" + discovery-method = config + # boostrap filters ports with the same IP assuming they are previous instances running on the same node + # unless a port is specified + port-name = "management" + required-contact-point-nr = 1 + # config service discovery never changes + stable-margin = 1 ms + # bootstrap without all the nodes being up + contact-with-all-contact-points = false +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/logback.xml b/samples/replicated/shopping-cart-service-java/src/main/resources/logback.xml new file mode 100644 index 000000000..f4e889fcd --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/logback.xml @@ -0,0 +1,19 @@ + + + + + [%date{ISO8601}] [%level] [%logger] [%X{akkaAddress}] [%marker] [%thread] - %msg%n + + + + + 8192 + true + + + + + + + + diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/persistence.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/persistence.conf new file mode 100644 index 000000000..c8e4a6176 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/persistence.conf @@ -0,0 +1,40 @@ +akka { + persistence { + journal { + plugin = "akka.persistence.r2dbc.journal" + publish-events = on + } + snapshot-store { + plugin = "akka.persistence.r2dbc.snapshot" + } + r2dbc { + # yugabyte or postgres + dialect = "postgres" + connection-factory { + driver = "postgres" + + host = "localhost" + port = 5432 + database = "postgres" + user = "postgres" + password = "postgres" + } + + # We trust that system time will not move backward for two subsequent persists from the same entity. + # See also https://github.com/yugabyte/yugabyte-db/issues/10996 + db-timestamp-monotonic-increasing = on + + # Workaround of https://github.com/yugabyte/yugabyte-db/issues/10995 + # FIXME: This property should be removed when the Yugabyte issue has been resolved. + # Note that query.behind-current-time can probably be decreased if we can use db time. + use-app-timestamp = on + } + } + + projection.r2dbc { + offset-store { + # only timestamp based offsets + offset-table = "" + } + } +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local-shared.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local-shared.conf new file mode 100644 index 000000000..e68b99476 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local-shared.conf @@ -0,0 +1,15 @@ +include "local-shared" + +replicated-shopping-cart.self-replica-id = replica1 + +akka.discovery.config.services { + "shopping-cart-service" { + endpoints = [ + {host = "127.0.0.1", port = 9101} + {host = "127.0.0.1", port = 9102} + {host = "127.0.0.1", port = 9103} + ] + } +} + +akka.persistence.r2dbc.connection-factory.port = 5101 diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local1.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local1.conf new file mode 100644 index 000000000..a9538ce33 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local1.conf @@ -0,0 +1,7 @@ +include "application" +include "replica1-local-shared" + +shopping-cart-service.grpc.port = 8101 + +akka.remote.artery.canonical.port = 7101 +akka.management.http.port = 9101 diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local2.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local2.conf new file mode 100644 index 000000000..632abe7ea --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local2.conf @@ -0,0 +1,7 @@ +include "application" +include "replica1-local-shared" + +shopping-cart-service.grpc.port = 8102 + +akka.remote.artery.canonical.port = 7102 +akka.management.http.port = 9102 diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local3.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local3.conf new file mode 100644 index 000000000..0692ea90f --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replica1-local3.conf @@ -0,0 +1,7 @@ +include "application" +include "replica1-local-shared" + +shopping-cart-service.grpc.port = 8103 + +akka.remote.artery.canonical.port = 7103 +akka.management.http.port = 9103 diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local-shared.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local-shared.conf new file mode 100644 index 000000000..aa088205d --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local-shared.conf @@ -0,0 +1,15 @@ +include "local-shared" + +replicated-shopping-cart.self-replica-id = replica2 + +akka.discovery.config.services { + "shopping-cart-service" { + endpoints = [ + {host = "127.0.0.1", port = 9201} + {host = "127.0.0.1", port = 9202} + {host = "127.0.0.1", port = 9203} + ] + } +} + +akka.persistence.r2dbc.connection-factory.port = 5201 diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local1.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local1.conf new file mode 100644 index 000000000..7f30c8695 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local1.conf @@ -0,0 +1,7 @@ +include "application" +include "replica2-local-shared" + +shopping-cart-service.grpc.port = 8201 + +akka.remote.artery.canonical.port = 7201 +akka.management.http.port = 9201 diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local2.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local2.conf new file mode 100644 index 000000000..1245f9ef3 --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local2.conf @@ -0,0 +1,7 @@ +include "application" +include "replica2-local-shared" + +shopping-cart-service.grpc.port = 8202 + +akka.remote.artery.canonical.port = 7202 +akka.management.http.port = 9202 diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local3.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local3.conf new file mode 100644 index 000000000..3d10a7d9a --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replica2-local3.conf @@ -0,0 +1,7 @@ +include "application" +include "replica2-local-shared" + +shopping-cart-service.grpc.port = 8203 + +akka.remote.artery.canonical.port = 7203 +akka.management.http.port = 9203 diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf new file mode 100644 index 000000000..a184456ed --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf @@ -0,0 +1,30 @@ +akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } +} + +replicated-shopping-cart { + entity-event-replication-timeout = 10s + parallel-updates = 8 + replicas: [ + { + replica-id = "replica1" + number-of-consumers = 4 + grpc.client { + host = "localhost" + port = 8101 + use-tls = off + } + }, + { + replica-id = "replica2" + number-of-consumers = 4 + grpc.client { + host = "localhost" + port = 8201 + use-tls = off + } + } + ] +} diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/serialization.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/serialization.conf new file mode 100644 index 000000000..7d153b9cd --- /dev/null +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/serialization.conf @@ -0,0 +1,3 @@ +akka.actor.serialization-bindings { + "shopping.cart.CborSerializable" = jackson-cbor +} diff --git a/samples/replicated/shopping-cart-service-scala/.gitignore b/samples/replicated/shopping-cart-service-scala/.gitignore new file mode 100644 index 000000000..6751d0d28 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/.gitignore @@ -0,0 +1,16 @@ +target/ +.bsp/ + +.settings +.project +.classpath + +.idea +*.iml + +.DS_Store + +.metals +.vscode +project/bloop +project/metals.sbt diff --git a/samples/replicated/shopping-cart-service-scala/.scalafmt.conf b/samples/replicated/shopping-cart-service-scala/.scalafmt.conf new file mode 100644 index 000000000..7cc46b226 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/.scalafmt.conf @@ -0,0 +1,49 @@ +version = 3.0.3 + +style = defaultWithAlign + +docstrings.style = Asterisk +indentOperator.preset = spray +maxColumn = 120 +rewrite.rules = [RedundantParens, SortImports, AvoidInfix] +unindentTopLevelOperators = true +align.tokens = [{code = "=>", owner = "Case"}] +align.openParenDefnSite = false +align.openParenCallSite = false + +optIn.configStyleArguments = false +danglingParentheses.preset = false +spaces.inImportCurlyBraces = true +newlines.afterCurlyLambda = preserve + +rewrite.neverInfix.excludeFilters = [ + and + min + max + until + to + by + eq + ne + "should.*" + "contain.*" + "must.*" + in + ignore + be + taggedAs + thrownBy + synchronized + have + when + size + only + noneOf + oneElementOf + noElementsOf + atLeastOneElementOf + atMostOneElementOf + allElementsOf + inOrderElementsOf + theSameElementsAs +] diff --git a/samples/replicated/shopping-cart-service-scala/LICENSE b/samples/replicated/shopping-cart-service-scala/LICENSE new file mode 100644 index 000000000..4239f09e0 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/LICENSE @@ -0,0 +1,10 @@ +Akka sample by Lightbend + +Licensed under Public Domain (CC0) + +To the extent possible under law, the person who associated CC0 with +this Template has waived all copyright and related or neighboring +rights to this Template. + +You should have received a copy of the CC0 legalcode along with this +work. If not, see . diff --git a/samples/replicated/shopping-cart-service-scala/README.md b/samples/replicated/shopping-cart-service-scala/README.md new file mode 100644 index 000000000..ac4ef7cf7 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/README.md @@ -0,0 +1,73 @@ +## Running the sample code + +1. Start two local PostgresSQL servers, on ports 5101 and 5201. The included `docker-compose.yml` starts everything required for running locally. + + ```shell + docker-compose up -d + + # creates the tables needed for Akka Persistence + # as well as the offset store table for Akka Projection + docker exec -i postgres_db_1 psql -U postgres -t < ddl-scripts/create_tables.sql + docker exec -i postgres_db_2 psql -U postgres -t < ddl-scripts/create_tables.sql + ``` + +2. Start a first node for each replica: + + ```shell + sbt -Dconfig.resource=replica1-local1.conf run + ``` + + ```shell + sbt -Dconfig.resource=replica2-local1.conf run + ``` + +3. (Optional) Start another node with different ports: + + ```shell + sbt -Dconfig.resource=replica1-local2.conf run + ``` + + ```shell + sbt -Dconfig.resource=replica2-local2.conf run + ``` + +4. (Optional) More can be started: + + ```shell + sbt -Dconfig.resource=replica1-local3.conf run + ``` + + ```shell + sbt -Dconfig.resource=replica2-local3.conf run + ``` + +5. Check for service readiness + + ```shell + curl http://localhost:9101/ready + ``` + + ```shell + curl http://localhost:9201/ready + ``` + +6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl): + + ```shell + # add item to cart on the first replica + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":7}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem + + # get cart from first replica + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart + + # get cart from second replica + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8201 shoppingcart.ShoppingCartService.GetCart + + # update quantity of item on the second replica + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":2}' -plaintext 127.0.0.1:8201 shoppingcart.ShoppingCartService.RemoveItem + + # check out cart + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout + ``` + + or same `grpcurl` commands to port 8102/8202 to reach node 2. diff --git a/samples/replicated/shopping-cart-service-scala/build.sbt b/samples/replicated/shopping-cart-service-scala/build.sbt new file mode 100644 index 000000000..8421f8f48 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/build.sbt @@ -0,0 +1,75 @@ +name := "shopping-cart-service" + +organization := "com.lightbend.akka.samples" +organizationHomepage := Some(url("https://akka.io")) +licenses := Seq( + ("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) + +scalaVersion := "2.13.10" + +Compile / scalacOptions ++= Seq( + "-release", "11", + "-deprecation", + "-feature", + "-unchecked", + "-Xlog-reflective-calls", + "-Xlint") +Compile / javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation") + +Test / parallelExecution := false +Test / testOptions += Tests.Argument("-oDF") +Test / logBuffered := false + +run / fork := true +// pass along config selection to forked jvm +run / javaOptions ++= sys.props + .get("config.resource") + .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) +Global / cancelable := false // ctrl-c + +val AkkaVersion = sys.props.getOrElse("akka.version", "2.8.0-M4") +val AkkaHttpVersion = "10.4.0" +val AkkaManagementVersion = "1.2.0" +val AkkaPersistenceR2dbcVersion = "1.0.1" +val AkkaProjectionVersion = + sys.props.getOrElse("akka-projection.version", "1.3.1") + +enablePlugins(AkkaGrpcPlugin) + +enablePlugins(JavaAppPackaging, DockerPlugin) +dockerBaseImage := "docker.io/library/adoptopenjdk:11-jre-hotspot" +dockerUsername := sys.props.get("docker.username") +dockerRepository := sys.props.get("docker.registry") +ThisBuild / dynverSeparator := "-" + +libraryDependencies ++= Seq( + // 1. Basic dependencies for a clustered application + "com.typesafe.akka" %% "akka-stream" % AkkaVersion, + "com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion, + "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion, + "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test, + "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test, + // Akka Management powers Health Checks and Akka Cluster Bootstrapping + "com.lightbend.akka.management" %% "akka-management" % AkkaManagementVersion, + "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, + "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion, + "com.lightbend.akka.management" %% "akka-management-cluster-http" % AkkaManagementVersion, + "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % AkkaManagementVersion, + "com.typesafe.akka" %% "akka-discovery" % AkkaVersion, + // Common dependencies for logging and testing + "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion, + "ch.qos.logback" % "logback-classic" % "1.2.11", + "org.scalatest" %% "scalatest" % "3.1.2" % Test, + // 2. Using gRPC and/or protobuf + "com.typesafe.akka" %% "akka-http2-support" % AkkaHttpVersion, + // 3. Using Akka Persistence + "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion, + "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion, + "com.lightbend.akka" %% "akka-persistence-r2dbc" % AkkaPersistenceR2dbcVersion, + "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test, + // 4. Querying and publishing data from Akka Persistence + "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion, + "com.lightbend.akka" %% "akka-projection-r2dbc" % AkkaPersistenceR2dbcVersion, + "com.lightbend.akka" %% "akka-projection-grpc" % AkkaProjectionVersion, + "com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion, + "com.lightbend.akka" %% "akka-projection-testkit" % AkkaProjectionVersion % Test) diff --git a/samples/replicated/shopping-cart-service-scala/ddl-scripts/create_tables.sql b/samples/replicated/shopping-cart-service-scala/ddl-scripts/create_tables.sql new file mode 100644 index 000000000..18496075d --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/ddl-scripts/create_tables.sql @@ -0,0 +1,94 @@ +CREATE TABLE IF NOT EXISTS event_journal( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + db_timestamp timestamp with time zone NOT NULL, + + event_ser_id INTEGER NOT NULL, + event_ser_manifest VARCHAR(255) NOT NULL, + event_payload BYTEA NOT NULL, + + deleted BOOLEAN DEFAULT FALSE NOT NULL, + writer VARCHAR(255) NOT NULL, + adapter_manifest VARCHAR(255), + tags TEXT ARRAY, + + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id, seq_nr) +); + +CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, entity_type, db_timestamp, seq_nr); + +CREATE TABLE IF NOT EXISTS snapshot( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + write_timestamp BIGINT NOT NULL, + ser_id INTEGER NOT NULL, + ser_manifest VARCHAR(255) NOT NULL, + snapshot BYTEA NOT NULL, + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id) +); + +CREATE TABLE IF NOT EXISTS durable_state ( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + revision BIGINT NOT NULL, + db_timestamp timestamp with time zone NOT NULL, + + state_ser_id INTEGER NOT NULL, + state_ser_manifest VARCHAR(255), + state_payload BYTEA NOT NULL, + tags TEXT ARRAY, + + PRIMARY KEY(persistence_id, revision) +); + +-- `durable_state_slice_idx` is only needed if the slice based queries are used +CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision); + +-- Primitive offset types are stored in this table. +-- If only timestamp based offsets are used this table is optional. +-- Configure akka.projection.r2dbc.offset-store.offset-table="" if the table is not created. +CREATE TABLE IF NOT EXISTS akka_projection_offset_store ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + current_offset VARCHAR(255) NOT NULL, + manifest VARCHAR(32) NOT NULL, + mergeable BOOLEAN NOT NULL, + last_updated BIGINT NOT NULL, + PRIMARY KEY(projection_name, projection_key) +); + +-- Timestamp based offsets are stored in this table. +CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + slice INT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + -- timestamp_offset is the db_timestamp of the original event + timestamp_offset timestamp with time zone NOT NULL, + -- timestamp_consumed is when the offset was stored + -- the consumer lag is timestamp_consumed - timestamp_offset + timestamp_consumed timestamp with time zone NOT NULL, + PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) +); + +CREATE TABLE IF NOT EXISTS akka_projection_management ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + paused BOOLEAN NOT NULL, + last_updated BIGINT NOT NULL, + PRIMARY KEY(projection_name, projection_key) +); \ No newline at end of file diff --git a/samples/replicated/shopping-cart-service-scala/docker-compose.yml b/samples/replicated/shopping-cart-service-scala/docker-compose.yml new file mode 100644 index 000000000..bf9c444bc --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/docker-compose.yml @@ -0,0 +1,18 @@ +version: '2.2' +services: + postgres-db-1: + image: postgres:latest + container_name: postgres_db_1 + ports: + - 5101:5432 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + postgres-db-2: + image: postgres:latest + container_name: postgres_db_2 + ports: + - 5201:5432 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres diff --git a/samples/replicated/shopping-cart-service-scala/project/build.properties b/samples/replicated/shopping-cart-service-scala/project/build.properties new file mode 100644 index 000000000..563a014da --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.7.2 diff --git a/samples/replicated/shopping-cart-service-scala/project/plugins.sbt b/samples/replicated/shopping-cart-service-scala/project/plugins.sbt new file mode 100644 index 000000000..b80c63bee --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/project/plugins.sbt @@ -0,0 +1,4 @@ +addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.2.1") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.8.1") +addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1") diff --git a/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto b/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto new file mode 100644 index 000000000..65eed87bd --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "shopping.cart.proto"; + +package shoppingcart; + +// gRPC definition for ShoppingCartService + +service ShoppingCartService { + rpc AddItem (AddItemRequest) returns (Cart) {} + rpc RemoveItem (RemoveItemRequest) returns (Cart) {} + rpc Checkout (CheckoutRequest) returns (Cart) {} + rpc GetCart (GetCartRequest) returns (Cart) {} +} + +message AddItemRequest { + string cartId = 1; + string itemId = 2; + int32 quantity = 3; +} + +message RemoveItemRequest { + string cartId = 1; + string itemId = 2; + int32 quantity = 3; +} + +message CheckoutRequest { + string cartId = 1; +} + +message GetCartRequest { + string cartId = 1; +} + +message Cart { + repeated Item items = 1; + bool checkedOut = 2; +} + +message Item { + string itemId = 1; + int32 quantity = 2; +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/application.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/application.conf new file mode 100644 index 000000000..3fa260404 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/application.conf @@ -0,0 +1,12 @@ +include "cluster" +include "grpc" +include "serialization" +include "persistence" + +akka { + loglevel = DEBUG +} + +shopping-cart-service { + ask-timeout = 5 s +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/cluster.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/cluster.conf new file mode 100644 index 000000000..4cc1f8559 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/cluster.conf @@ -0,0 +1,27 @@ +akka { + actor.provider = cluster + + remote.artery { + canonical.port = 2551 + } + + cluster { + downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + + shutdown-after-unsuccessful-join-seed-nodes = 120s + + sharding { + least-shard-allocation-strategy.rebalance-absolute-limit = 20 + passivation.strategy = default-strategy + } + } +} + +akka.management { + http { + port = 8558 + port = ${?HTTP_MGMT_PORT} + } + # actual bootstrap/discovery config would go here + # for running locally see local-shared.conf +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/grpc.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/grpc.conf new file mode 100644 index 000000000..2f360bf20 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/grpc.conf @@ -0,0 +1,10 @@ +akka.http.server.preview.enable-http2 = on + +shopping-cart-service { + grpc { + # consider setting this to a specific interface for your environment + interface = "0.0.0.0" + port = 8101 + port = ${?GRPC_PORT} + } +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/local-shared.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/local-shared.conf new file mode 100644 index 000000000..307c8751d --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/local-shared.conf @@ -0,0 +1,18 @@ +include "replication" + +shopping-cart-service.grpc.interface = "127.0.0.1" +akka.remote.artery.canonical.hostname = "127.0.0.1" +akka.management.http.hostname = "127.0.0.1" + +akka.management.cluster.bootstrap.contact-point-discovery { + service-name = "shopping-cart-service" + discovery-method = config + # boostrap filters ports with the same IP assuming they are previous instances running on the same node + # unless a port is specified + port-name = "management" + required-contact-point-nr = 1 + # config service discovery never changes + stable-margin = 1 ms + # bootstrap without all the nodes being up + contact-with-all-contact-points = false +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/logback.xml b/samples/replicated/shopping-cart-service-scala/src/main/resources/logback.xml new file mode 100644 index 000000000..f4e889fcd --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/logback.xml @@ -0,0 +1,19 @@ + + + + + [%date{ISO8601}] [%level] [%logger] [%X{akkaAddress}] [%marker] [%thread] - %msg%n + + + + + 8192 + true + + + + + + + + diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/persistence.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/persistence.conf new file mode 100644 index 000000000..299652f65 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/persistence.conf @@ -0,0 +1,39 @@ +akka { + persistence { + journal { + plugin = "akka.persistence.r2dbc.journal" + } + snapshot-store { + plugin = "akka.persistence.r2dbc.snapshot" + } + r2dbc { + # yugabyte or postgres + dialect = "postgres" + connection-factory { + driver = "postgres" + + host = "localhost" + port = 5432 + database = "postgres" + user = "postgres" + password = "postgres" + } + + # We trust that system time will not move backward for two subsequent persists from the same entity. + # See also https://github.com/yugabyte/yugabyte-db/issues/10996 + db-timestamp-monotonic-increasing = on + + # Workaround of https://github.com/yugabyte/yugabyte-db/issues/10995 + # FIXME: This property should be removed when the Yugabyte issue has been resolved. + # Note that query.behind-current-time can probably be decreased if we can use db time. + use-app-timestamp = on + } + } + + projection.r2dbc { + offset-store { + # only timestamp based offsets + offset-table = "" + } + } +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local-shared.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local-shared.conf new file mode 100644 index 000000000..e68b99476 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local-shared.conf @@ -0,0 +1,15 @@ +include "local-shared" + +replicated-shopping-cart.self-replica-id = replica1 + +akka.discovery.config.services { + "shopping-cart-service" { + endpoints = [ + {host = "127.0.0.1", port = 9101} + {host = "127.0.0.1", port = 9102} + {host = "127.0.0.1", port = 9103} + ] + } +} + +akka.persistence.r2dbc.connection-factory.port = 5101 diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local1.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local1.conf new file mode 100644 index 000000000..a9538ce33 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local1.conf @@ -0,0 +1,7 @@ +include "application" +include "replica1-local-shared" + +shopping-cart-service.grpc.port = 8101 + +akka.remote.artery.canonical.port = 7101 +akka.management.http.port = 9101 diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local2.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local2.conf new file mode 100644 index 000000000..632abe7ea --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local2.conf @@ -0,0 +1,7 @@ +include "application" +include "replica1-local-shared" + +shopping-cart-service.grpc.port = 8102 + +akka.remote.artery.canonical.port = 7102 +akka.management.http.port = 9102 diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local3.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local3.conf new file mode 100644 index 000000000..0692ea90f --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica1-local3.conf @@ -0,0 +1,7 @@ +include "application" +include "replica1-local-shared" + +shopping-cart-service.grpc.port = 8103 + +akka.remote.artery.canonical.port = 7103 +akka.management.http.port = 9103 diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local-shared.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local-shared.conf new file mode 100644 index 000000000..aa088205d --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local-shared.conf @@ -0,0 +1,15 @@ +include "local-shared" + +replicated-shopping-cart.self-replica-id = replica2 + +akka.discovery.config.services { + "shopping-cart-service" { + endpoints = [ + {host = "127.0.0.1", port = 9201} + {host = "127.0.0.1", port = 9202} + {host = "127.0.0.1", port = 9203} + ] + } +} + +akka.persistence.r2dbc.connection-factory.port = 5201 diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local1.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local1.conf new file mode 100644 index 000000000..7f30c8695 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local1.conf @@ -0,0 +1,7 @@ +include "application" +include "replica2-local-shared" + +shopping-cart-service.grpc.port = 8201 + +akka.remote.artery.canonical.port = 7201 +akka.management.http.port = 9201 diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local2.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local2.conf new file mode 100644 index 000000000..1245f9ef3 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local2.conf @@ -0,0 +1,7 @@ +include "application" +include "replica2-local-shared" + +shopping-cart-service.grpc.port = 8202 + +akka.remote.artery.canonical.port = 7202 +akka.management.http.port = 9202 diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local3.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local3.conf new file mode 100644 index 000000000..3d10a7d9a --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replica2-local3.conf @@ -0,0 +1,7 @@ +include "application" +include "replica2-local-shared" + +shopping-cart-service.grpc.port = 8203 + +akka.remote.artery.canonical.port = 7203 +akka.management.http.port = 9203 diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf new file mode 100644 index 000000000..a184456ed --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf @@ -0,0 +1,30 @@ +akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } +} + +replicated-shopping-cart { + entity-event-replication-timeout = 10s + parallel-updates = 8 + replicas: [ + { + replica-id = "replica1" + number-of-consumers = 4 + grpc.client { + host = "localhost" + port = 8101 + use-tls = off + } + }, + { + replica-id = "replica2" + number-of-consumers = 4 + grpc.client { + host = "localhost" + port = 8201 + use-tls = off + } + } + ] +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/serialization.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/serialization.conf new file mode 100644 index 000000000..7d153b9cd --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/serialization.conf @@ -0,0 +1,3 @@ +akka.actor.serialization-bindings { + "shopping.cart.CborSerializable" = jackson-cbor +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/CborSerializable.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/CborSerializable.scala new file mode 100644 index 000000000..3c0d58e8b --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/CborSerializable.scala @@ -0,0 +1,8 @@ +package shopping.cart + +/** + * Marker trait for serialization with Jackson CBOR. + * + * Enabled in serialization.conf `akka.actor.serialization-bindings` (via application.conf). + */ +trait CborSerializable diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala new file mode 100644 index 000000000..8f86915b1 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala @@ -0,0 +1,41 @@ +package shopping.cart + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.ActorSystem +import akka.management.cluster.bootstrap.ClusterBootstrap +import akka.management.scaladsl.AkkaManagement +import org.slf4j.LoggerFactory +import scala.util.control.NonFatal + +object Main { + + val logger = LoggerFactory.getLogger("shopping.cart.Main") + + def main(args: Array[String]): Unit = { + val system = + ActorSystem[Nothing](Behaviors.empty, "ShoppingCartService") + try { + init(system) + } catch { + case NonFatal(e) => + logger.error("Terminating due to initialization failure.", e) + system.terminate() + } + } + + def init(system: ActorSystem[_]): Unit = { + AkkaManagement(system).start() + ClusterBootstrap(system).start() + + // #single-service-handler + val replicatedShoppingCart = ShoppingCart.init(system) + val replicationService = replicatedShoppingCart.createSingleServiceHandler() + // #single-service-handler + + val grpcInterface = system.settings.config.getString("shopping-cart-service.grpc.interface") + val grpcPort = system.settings.config.getInt("shopping-cart-service.grpc.port") + val grpcService = new ShoppingCartServiceImpl(system, replicatedShoppingCart.entityTypeKey) + ShoppingCartServer.start(grpcInterface, grpcPort, system, grpcService, replicationService) + } + +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala new file mode 100644 index 000000000..1f1632de5 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala @@ -0,0 +1,264 @@ +package shopping.cart + +import java.time.Instant + +import scala.concurrent.duration._ + +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.pattern.StatusReply +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.ReplicationContext +import akka.persistence.typed.scaladsl.ReplyEffect +import akka.persistence.typed.scaladsl.RetentionCriteria +import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors +import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.ReplicationProjectionProvider +import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import akka.projection.r2dbc.scaladsl.R2dbcProjection + +/** + * This is an event sourced actor (`EventSourcedBehavior`). An entity managed by Cluster Sharding. + * + * It has a state, [[ShoppingCart.State]], which holds the current shopping cart items and whether it's checked out. + * + * You interact with event sourced actors by sending commands to them, see classes implementing + * [[ShoppingCart.Command]]. + * + * The command handler validates and translates commands to events, see classes implementing [[ShoppingCart.Event]]. + * It's the events that are persisted by the `EventSourcedBehavior`. The event handler updates the current state based + * on the event. This is done when the event is first created, and when the entity is loaded from the database - each + * event will be replayed to recreate the state of the entity. + * + * This shopping cart is replicated using Replicated Event Sourcing. Multiple entity instances can be active at the same + * time, so the state must be convergent, and each cart item is modelled as a counter. When checking out the cart, only + * one of the replicas performs the actual checkout, once it's seen that all replicas have closed this cart which will + * be after all item updated events have been replicated. + */ +object ShoppingCart { + + /** + * The current state held by the `EventSourcedBehavior`. + */ + final case class State(items: Map[String, Int], closed: Set[ReplicaId], checkedOut: Option[Instant]) + extends CborSerializable { + + def isClosed: Boolean = + closed.nonEmpty + + def updateItem(itemId: String, quantity: Int): State = + copy(items = items + (itemId -> (items.getOrElse(itemId, 0) + quantity))) + + def close(replica: ReplicaId): State = + copy(closed = closed + replica) + + def checkout(now: Instant): State = + copy(checkedOut = Some(now)) + + def toSummary: Summary = { + val cartItems = items.collect { + case (id, quantity) if quantity > 0 => id -> quantity + } + Summary(cartItems, isClosed) + } + } + + object State { + val empty: State = State(items = Map.empty, closed = Set.empty, checkedOut = None) + } + + /** + * This interface defines all the commands (messages) that the ShoppingCart actor supports. + */ + sealed trait Command extends CborSerializable + + /** + * A command to add an item to the cart. + * + * It replies with `StatusReply[Summary]`, which is sent back to the caller when all the events emitted by this + * command are successfully persisted. + */ + final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) extends Command + + /** + * A command to remove an item from the cart. + */ + final case class RemoveItem(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) extends Command + + /** + * A command to check out the shopping cart. + */ + final case class Checkout(replyTo: ActorRef[StatusReply[Summary]]) extends Command + + /** + * Internal command to close a shopping cart that's being checked out. + */ + case object CloseForCheckout extends Command + + /** + * Internal command to complete the checkout for a shopping cart. + */ + case object CompleteCheckout extends Command + + /** + * A command to get the current state of the shopping cart. + */ + final case class Get(replyTo: ActorRef[Summary]) extends Command + + /** + * Summary of the shopping cart state, used in reply messages. + */ + final case class Summary(items: Map[String, Int], checkedOut: Boolean) extends CborSerializable + + /** + * This interface defines all the events that the ShoppingCart supports. + */ + sealed trait Event extends CborSerializable + + final case class ItemUpdated(itemId: String, quantity: Int) extends Event + + final case class Closed(replica: ReplicaId) extends Event + + final case class CheckedOut(eventTime: Instant) extends Event + + // #init + def init(implicit system: ActorSystem[_]): Replication[Command] = { + val projectionProvider: ReplicationProjectionProvider = R2dbcProjection.atLeastOnceFlow(_, None, _, _)(_) + val replicationSettings = ReplicationSettings[Command]("replicated-shopping-cart", projectionProvider) + Replication.grpcReplication(replicationSettings)(ShoppingCart.apply) + } + + def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = { + Behaviors.setup[Command] { context => + replicatedBehaviors.setup { replicationContext => + new ShoppingCart(context, replicationContext).behavior() + } + } + } + // #init +} + +class ShoppingCart(context: ActorContext[ShoppingCart.Command], replicationContext: ReplicationContext) { + import ShoppingCart._ + + // one of the replicas is responsible for checking out the shopping cart, once all replicas have closed + private val isLeader: Boolean = { + val orderedReplicas = replicationContext.allReplicas.toSeq.sortBy(_.id) + val leaderIndex = math.abs(replicationContext.entityId.hashCode % orderedReplicas.size) + orderedReplicas(leaderIndex) == replicationContext.replicaId + } + + def behavior(): EventSourcedBehavior[Command, Event, State] = { + EventSourcedBehavior + .withEnforcedReplies[Command, Event, State]( + persistenceId = replicationContext.persistenceId, + emptyState = State.empty, + commandHandler = handleCommand, + eventHandler = handleEvent) + .withRetention(RetentionCriteria + .snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3)) + .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) + } + + private def handleCommand(state: State, command: Command): ReplyEffect[Event, State] = { + // The shopping cart behavior changes if it's closed / checked out or not. + // The commands are handled differently for each case. + if (state.isClosed) + closedShoppingCart(state, command) + else + openShoppingCart(state, command) + } + + private def openShoppingCart(state: State, command: Command): ReplyEffect[Event, State] = { + command match { + case AddItem(itemId, quantity, replyTo) => + Effect + .persist(ItemUpdated(itemId, quantity)) + .thenReply(replyTo) { updatedCart => + StatusReply.Success(updatedCart.toSummary) + } + + case RemoveItem(itemId, quantity, replyTo) => + Effect + .persist(ItemUpdated(itemId, -quantity)) + .thenReply(replyTo) { updatedCart => + StatusReply.Success(updatedCart.toSummary) + } + + case Checkout(replyTo) => + Effect + .persist(Closed(replicationContext.replicaId)) + .thenReply(replyTo) { updatedCart => + StatusReply.Success(updatedCart.toSummary) + } + + case CloseForCheckout => + Effect + .persist(Closed(replicationContext.replicaId)) + .thenNoReply() + + case CompleteCheckout => + // only closed shopping carts should be completable + Effect.noReply + + case Get(replyTo) => + Effect.reply(replyTo)(state.toSummary) + } + } + + private def closedShoppingCart(state: State, command: Command): ReplyEffect[Event, State] = { + command match { + case Get(replyTo) => + Effect.reply(replyTo)(state.toSummary) + case cmd: AddItem => + Effect.reply(cmd.replyTo)(StatusReply.Error("Can't add an item to an already checked out shopping cart")) + case cmd: RemoveItem => + Effect.reply(cmd.replyTo)(StatusReply.Error("Can't remove an item from an already checked out shopping cart")) + case cmd: Checkout => + Effect.reply(cmd.replyTo)(StatusReply.Error("Can't checkout already checked out shopping cart")) + case CloseForCheckout => + Effect + .persist(Closed(replicationContext.replicaId)) + .thenNoReply() + case CompleteCheckout => + // TODO: trigger other effects from shopping cart checkout + Effect + .persist(CheckedOut(Instant.now())) + .thenNoReply() + } + } + + private def handleEvent(state: State, event: Event): State = { + val newState = event match { + case ItemUpdated(itemId, quantity) => + state.updateItem(itemId, quantity) + case Closed(replica) => + state.close(replica) + case CheckedOut(eventTime) => + state.checkout(eventTime) + } + eventTriggers(newState, event) + newState + } + + private def eventTriggers(state: State, event: Event): Unit = { + if (!replicationContext.recoveryRunning) { + event match { + case _: Closed => + if (!state.closed(replicationContext.replicaId)) { + context.self ! CloseForCheckout + } else if (isLeader) { + val allClosed = replicationContext.allReplicas.diff(state.closed).isEmpty + if (allClosed) context.self ! CompleteCheckout + } + case _ => + } + } + } +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServer.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServer.scala new file mode 100644 index 000000000..94dea382c --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServer.scala @@ -0,0 +1,53 @@ +package shopping.cart + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success + +import akka.actor.typed.ActorSystem +import akka.grpc.scaladsl.ServerReflection +import akka.grpc.scaladsl.ServiceHandler +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.HttpResponse + +object ShoppingCartServer { + + def start( + interface: String, + port: Int, + system: ActorSystem[_], + grpcService: proto.ShoppingCartService, + replicationService: PartialFunction[HttpRequest, Future[HttpResponse]]): Unit = { + implicit val sys: ActorSystem[_] = system + implicit val ec: ExecutionContext = + system.executionContext + + // #bind + val service: HttpRequest => Future[HttpResponse] = + ServiceHandler.concatOrNotFound( + replicationService, + proto.ShoppingCartServiceHandler.partial(grpcService), + // ServerReflection enabled to support grpcurl without import-path and proto parameters + ServerReflection.partial(List(proto.ShoppingCartService))) + + val bound = + Http() + .newServerAt(interface, port) + .bind(service) + .map(_.addToCoordinatedShutdown(3.seconds)) + // #bind + + bound.onComplete { + case Success(binding) => + val address = binding.localAddress + system.log.info("Shopping online at gRPC server {}:{}", address.getHostString, address.getPort) + case Failure(ex) => + system.log.error("Failed to bind gRPC endpoint, terminating system", ex) + system.terminate() + } + } + +} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala new file mode 100644 index 000000000..9ad40ae39 --- /dev/null +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala @@ -0,0 +1,82 @@ +package shopping.cart + +import java.util.concurrent.TimeoutException +import scala.concurrent.Future +import akka.actor.typed.ActorSystem +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.grpc.GrpcServiceException +import akka.util.Timeout +import io.grpc.Status +import org.slf4j.LoggerFactory + +class ShoppingCartServiceImpl(system: ActorSystem[_], entityKey: EntityTypeKey[ShoppingCart.Command]) + extends proto.ShoppingCartService { + + import system.executionContext + + private val logger = LoggerFactory.getLogger(getClass) + + implicit private val timeout: Timeout = + Timeout.create(system.settings.config.getDuration("shopping-cart-service.ask-timeout")) + + private val sharding = ClusterSharding(system) + + override def addItem(in: proto.AddItemRequest): Future[proto.Cart] = { + logger.info("addItem {} quantity {} to cart {}", in.itemId, in.quantity, in.cartId) + val entityRef = sharding.entityRefFor(entityKey, in.cartId) + val reply: Future[ShoppingCart.Summary] = + entityRef.askWithStatus(ShoppingCart.AddItem(in.itemId, in.quantity, _)) + val response = reply.map(cart => toProtoCart(cart)) + convertError(response) + } + + override def removeItem(in: proto.RemoveItemRequest): Future[proto.Cart] = { + logger.info("removeItem {} quantity {} from cart {}", in.itemId, in.quantity, in.cartId) + val entityRef = sharding.entityRefFor(entityKey, in.cartId) + val reply: Future[ShoppingCart.Summary] = + entityRef.askWithStatus(ShoppingCart.RemoveItem(in.itemId, in.quantity, _)) + val response = reply.map(cart => toProtoCart(cart)) + convertError(response) + } + + override def checkout(in: proto.CheckoutRequest): Future[proto.Cart] = { + logger.info("checkout {}", in.cartId) + val entityRef = sharding.entityRefFor(entityKey, in.cartId) + val reply: Future[ShoppingCart.Summary] = + entityRef.askWithStatus(ShoppingCart.Checkout) + val response = reply.map(cart => toProtoCart(cart)) + convertError(response) + } + + override def getCart(in: proto.GetCartRequest): Future[proto.Cart] = { + logger.info("getCart {}", in.cartId) + val entityRef = sharding.entityRefFor(entityKey, in.cartId) + val response = + entityRef.ask(ShoppingCart.Get).map { cart => + if (cart.items.isEmpty) + throw new GrpcServiceException(Status.NOT_FOUND.withDescription(s"Cart ${in.cartId} not found")) + else + toProtoCart(cart) + } + convertError(response) + } + + private def toProtoCart(cart: ShoppingCart.Summary): proto.Cart = { + proto.Cart( + cart.items.iterator.map { case (itemId, quantity) => + proto.Item(itemId, quantity) + }.toSeq, + cart.checkedOut) + } + + private def convertError[T](response: Future[T]): Future[T] = { + response.recoverWith { + case _: TimeoutException => + Future.failed(new GrpcServiceException(Status.UNAVAILABLE.withDescription("Operation timed out"))) + case exc => + Future.failed(new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription(exc.getMessage))) + } + } + +}