diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala index c7b0fa06e..b17a882b1 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala @@ -4,143 +4,214 @@ package akka.projection.grpc.replication -import akka.actor.testkit.typed.scaladsl.ActorTestKit -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors +import scala.concurrent.duration._ + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.projection.grpc.replication.javadsl.{ ReplicationProjectionProvider => JReplicationProjectionProvider } +import akka.projection.grpc.replication.javadsl.{ ReplicationSettings => JReplicationSettings } import akka.projection.grpc.replication.scaladsl.ReplicationProjectionProvider import akka.projection.grpc.replication.scaladsl.ReplicationSettings -import akka.projection.grpc.replication.javadsl.{ ReplicationSettings => JReplicationSettings } -import akka.projection.grpc.replication.javadsl.{ ReplicationProjectionProvider => JReplicationProjectionProvider } import com.typesafe.config.ConfigFactory import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.wordspec.AnyWordSpecLike + +object ReplicationSettingsSpec { + val config = ConfigFactory + .parseString(""" + my-replicated-entity { + # which of the replicas this node belongs to, should be the same + # across the nodes of each replica Akka cluster. + self-replica-id = dca + + # Pick it up from an environment variable to re-use the same config + # without changes across replicas + self-replica-id = ${?SELF_REPLICA} + + # max number of parallel in-flight (sent over sharding) entity updates + # per consumer/projection + parallel-updates = 8 + + # Fail the replication stream (and restart with backoff) if completing + # the write of a replicated event reaching the cluster takes more time + # than this. + entity-event-replication-timeout = 10s + + replicas: [ + { + # Unique identifier of the replica/datacenter, is stored in the events + # and cannot be changed after events have been persisted. + replica-id = "dca" + + # Number of replication streams/projections to start to consume events + # from this replica + number-of-consumers = 4 + + # Akka gRPC client config block for how to reach this replica + # from the other replicas, note that binding the server/publishing + # endpoint of each replica is done separately, in code. + grpc.client { + host = "dca.example.com" + port = 8443 + use-tls = true + } + }, + { + replica-id = "dcb" + number-of-consumers = 4 + # Optional - only run replication stream consumers for events from the + # remote replica on nodes with this role + consumers-on-cluster-role = dcb-consumer + grpc.client { + host = "dcb.example.com" + port = 8444 + } + }, + { + replica-id = "dcc" + number-of-consumers = 4 + grpc.client { + host = "dcc.example.com" + port = 8445 + } + } + ] + } + + // #config-replicated-shopping-cart + # Replication configuration for the ShoppingCart. Note that config `replicated-shopping-cart` + # is the same as the ShoppingCart entity type name. + replicated-shopping-cart { + # which of the replicas this node belongs to, should be the same + # across the nodes of each replica Akka cluster. + self-replica-id = us-east-1 + + # Pick it up from an environment variable to re-use the same config + # without changes across replicas + self-replica-id = ${?SELF_REPLICA} + + # max number of parallel in-flight (sent over sharding) entity updates + # per consumer/projection + parallel-updates = 8 + + # Fail the replication stream (and restart with backoff) if completing + # the write of a replicated event reaching the cluster takes more time + # than this. + entity-event-replication-timeout = 10s + + replicas: [ + { + # Unique identifier of the replica/datacenter, is stored in the events + # and cannot be changed after events have been persisted. + replica-id = "us-east-1" + + # Number of replication streams/projections to start to consume events + # from this replica + number-of-consumers = 4 + + # Akka gRPC client config block for how to reach this replica + # from the other replicas, note that binding the server/publishing + # endpoint of each replica is done separately, in code. + grpc.client { + host = "k8s-shopping-604179632a-148180922.us-east-2.elb.amazonaws.com" + host = ${?US_EAST_1_GRPC_HOST} + port = 443 + port = ${?US_EAST_1_GRPC_PORT} + use-tls = true + } + }, + { + replica-id = "eu-west-1" + number-of-consumers = 4 + # Optional - only run replication stream consumers for events from the + # remote replica on nodes with this role + consumers-on-cluster-role = replication-consumer + grpc.client { + host = "k8s-shopping-19708e1324-24617530ddc6d2cb.elb.eu-west-1.amazonaws.com" + host = ${?EU_WEST_1_GRPC_HOST} + port = 443 + port = ${?EU_WEST_1_GRPC_PORT} + } + } + ] + } + // #config-replicated-shopping-cart -import scala.concurrent.duration.DurationInt + """) + .resolve() +} -class ReplicationSettingsSpec extends AnyWordSpec with Matchers { +class ReplicationSettingsSpec + extends ScalaTestWithActorTestKit(ReplicationSettingsSpec.config) + with AnyWordSpecLike + with Matchers + with LogCapturing { trait MyCommand "The ReplicationSettings" should { - "Parse from config" in { - implicit val system: ActorSystem[Unit] = ActorSystem[Unit]( - Behaviors.empty[Unit], - "parse-test", - ConfigFactory.parseString(""" - // #config - my-replicated-entity { - # which of the replicas this node belongs to, should be the same - # across the nodes of each replica Akka cluster. - self-replica-id = dca - - # Pick it up from an environment variable to re-use the same config - # without changes across replicas - self-replica-id = ${?SELF_REPLICA} - - # max number of parallel in-flight (sent over sharding) entity updates - # per consumer/projection - parallel-updates = 8 - - # Fail the replication stream (and restart with backoff) if completing - # the write of a replicated event reaching the cluster takes more time - # than this. - entity-event-replication-timeout = 10s - - replicas: [ - { - # Unique identifier of the replica/datacenter, is stored in the events - # and cannot be changed after events have been persisted. - replica-id = "dca" - - # Number of replication streams/projections to start to consume events - # from this replica - number-of-consumers = 4 - - # Akka gRPC client config block for how to reach this replica - # from the other replicas, note that binding the server/publishing - # endpoint of each replica is done separately, in code. - grpc.client { - host = "dca.example.com" - port = 8443 - use-tls = true - } - }, - { - replica-id = "dcb" - number-of-consumers = 4 - # Optional - only run replication stream consumers for events from the - # remote replica on nodes with this role - consumers-on-cluster-role = dcb-consumer - grpc.client { - host = "dcb.example.com" - port = 8444 - } - }, - { - replica-id = "dcc" - number-of-consumers = 4 - grpc.client { - host = "dcc.example.com" - port = 8445 - } - } - ] - } - // #config - """).resolve()) - - try { - val settings = ReplicationSettings[MyCommand]( + "Parse from config with scaladsl" in { + val settings = ReplicationSettings[MyCommand]( + "my-replicated-entity", + // never actually used, just passed along + null: ReplicationProjectionProvider) + settings.streamId should ===("my-replicated-entity") + settings.entityEventReplicationTimeout should ===(10.seconds) + settings.selfReplicaId.id should ===("dca") + settings.otherReplicas.map(_.replicaId.id) should ===(Set("dcb", "dcc")) + settings.otherReplicas.forall(_.numberOfConsumers === 4) should ===(true) + settings.parallelUpdates should ===(8) + + val replicaB = settings.otherReplicas.find(_.replicaId.id == "dcb").get + replicaB.grpcClientSettings.defaultPort should ===(8444) + replicaB.grpcClientSettings.serviceName should ===("dcb.example.com") + replicaB.consumersOnClusterRole should ===(Some("dcb-consumer")) + } + + "Parse from config with javadsl" in { + val settings = ReplicationSettings[MyCommand]( + "my-replicated-entity", + // never actually used, just passed along + null: ReplicationProjectionProvider) + val javaSettings = JReplicationSettings + .create( + classOf[MyCommand], "my-replicated-entity", // never actually used, just passed along - null: ReplicationProjectionProvider) - settings.streamId should ===("my-replicated-entity") - settings.entityEventReplicationTimeout should ===(10.seconds) - settings.selfReplicaId.id should ===("dca") - settings.otherReplicas.map(_.replicaId.id) should ===(Set("dcb", "dcc")) - settings.otherReplicas.forall(_.numberOfConsumers === 4) should ===(true) - settings.parallelUpdates should ===(8) - - val replicaB = settings.otherReplicas.find(_.replicaId.id == "dcb").get - replicaB.grpcClientSettings.defaultPort should ===(8444) - replicaB.grpcClientSettings.serviceName should ===("dcb.example.com") - replicaB.consumersOnClusterRole should ===(Some("dcb-consumer")) - - // And Java DSL - val javaSettings = JReplicationSettings - .create( - classOf[MyCommand], - "my-replicated-entity", - // never actually used, just passed along - null: JReplicationProjectionProvider, - system) - .withEdgeReplication(true) - - val converted = javaSettings.toScala - converted.selfReplicaId should ===(settings.selfReplicaId) - converted.streamId should ===(settings.streamId) - converted.acceptEdgeReplication should ===(true) - - converted.otherReplicas.foreach { replica => - val scalaReplica = settings.otherReplicas.find(_.replicaId == replica.replicaId).get - replica.consumersOnClusterRole should ===(scalaReplica.consumersOnClusterRole) - replica.numberOfConsumers should ===(scalaReplica.numberOfConsumers) - // no equals on GrpcClientSettings - replica.grpcClientSettings.serviceName === (scalaReplica.grpcClientSettings.serviceName) - replica.grpcClientSettings.defaultPort === (scalaReplica.grpcClientSettings.defaultPort) - replica.grpcClientSettings.useTls === (scalaReplica.grpcClientSettings.useTls) - } + null: JReplicationProjectionProvider, + system) + .withEdgeReplication(true) + + val converted = javaSettings.toScala + converted.selfReplicaId should ===(settings.selfReplicaId) + converted.streamId should ===(settings.streamId) + converted.acceptEdgeReplication should ===(true) + + converted.otherReplicas.foreach { replica => + val scalaReplica = settings.otherReplicas.find(_.replicaId == replica.replicaId).get + replica.consumersOnClusterRole should ===(scalaReplica.consumersOnClusterRole) + replica.numberOfConsumers should ===(scalaReplica.numberOfConsumers) + // no equals on GrpcClientSettings + replica.grpcClientSettings.serviceName === (scalaReplica.grpcClientSettings.serviceName) + replica.grpcClientSettings.defaultPort === (scalaReplica.grpcClientSettings.defaultPort) + replica.grpcClientSettings.useTls === (scalaReplica.grpcClientSettings.useTls) + } - converted.entityEventReplicationTimeout should ===(settings.entityEventReplicationTimeout) - converted.entityTypeKey === (settings.entityTypeKey) - converted.eventProducerInterceptor === (settings.eventProducerInterceptor) - converted.projectionProvider === (settings.projectionProvider) - converted.parallelUpdates === (settings.parallelUpdates) + converted.entityEventReplicationTimeout should ===(settings.entityEventReplicationTimeout) + converted.entityTypeKey === (settings.entityTypeKey) + converted.eventProducerInterceptor === (settings.eventProducerInterceptor) + converted.projectionProvider === (settings.projectionProvider) + converted.parallelUpdates === (settings.parallelUpdates) + } - } finally { - ActorTestKit.shutdown(system) - } + "Parse from doc config" in { + val docSettings = ReplicationSettings[MyCommand]( + "replicated-shopping-cart", + // never actually used, just passed along + null: ReplicationProjectionProvider) + docSettings.streamId should ===("replicated-shopping-cart") } + } } diff --git a/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md b/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md index 911e391de..4833bdc4c 100644 --- a/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md +++ b/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md @@ -116,10 +116,10 @@ accept an entity name, a @apidoc[ReplicationProjectionProvider] and an actor sys is expected to have a top level entry with the entity name containing this structure: Scala -: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config } +: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config-replicated-shopping-cart } Java -: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config } +: @@snip [config](/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala) { #config-replicated-shopping-cart } The entries in the block refer to the local replica while `replicas` is a list of all replicas, including the node itself, with details about how to reach the replicas across the network. @@ -194,10 +194,23 @@ edge side, because there are no incoming connections. On the edge side you start with `Replication.grpcEdgeReplication`. +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init-edge } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init-edge } + On the cloud side you would start with `Replication.grpcReplication` as described above, but with the addition `withEdgeReplication(true)` in the @apidoc[ReplicationSettings] or enable `akka.projection.grpc.replication.accept-edge-replication` configuration. +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init-allow-edge } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init-allow-edge } + + ## Serialization of events The events are serialized for being passed over the wire using the same Akka serializer as configured for serializing diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java index b8af8cd96..7ad153f34 100644 --- a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java +++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java @@ -10,6 +10,7 @@ import akka.persistence.query.typed.EventEnvelope; import akka.persistence.typed.ReplicaId; import akka.persistence.typed.javadsl.*; +import akka.projection.grpc.replication.javadsl.EdgeReplication; import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors; import akka.projection.grpc.replication.javadsl.Replication; import akka.projection.grpc.replication.javadsl.ReplicationSettings; @@ -47,6 +48,8 @@ public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { + public static final String ENTITY_TYPE = "replicated-shopping-cart"; + static final String SMALL_QUANTITY_TAG = "small"; static final String MEDIUM_QUANTITY_TAG = "medium"; static final String LARGE_QUANTITY_TAG = "large"; @@ -308,7 +311,7 @@ public static Replication init(ActorSystem system) { ReplicationSettings replicationSettings = ReplicationSettings.create( Command.class, - "replicated-shopping-cart", + ShoppingCart.ENTITY_TYPE, R2dbcReplication.create(system), system); return Replication.grpcReplication(replicationSettings, ShoppingCart::create, system); @@ -327,18 +330,17 @@ public static Behavior create( // Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter. // #init-producerFilter public static Replication initWithProducerFilter(ActorSystem system) { + Predicate> producerFilter = + envelope -> envelope.getTags().contains(VIP_CUSTOMER_TAG); ReplicationSettings replicationSettings = ReplicationSettings.create( Command.class, - "replicated-shopping-cart", + ShoppingCart.ENTITY_TYPE, R2dbcReplication.create(system), - system); + system) + .withProducerFilter(producerFilter); - Predicate> producerFilter = envelope -> { - return envelope.getTags().contains(VIP_CUSTOMER_TAG); - }; - - return Replication.grpcReplication(replicationSettings, producerFilter, ShoppingCart::createWithProducerFilter, system); + return Replication.grpcReplication(replicationSettings, ShoppingCart::createWithProducerFilter, system); } public static Behavior createWithProducerFilter( @@ -354,6 +356,31 @@ public static Behavior createWithProducerFilter( } // #init-producerFilter + // #init-allow-edge + public static Replication initAllowEdge(ActorSystem system) { + ReplicationSettings replicationSettings = + ReplicationSettings.create( + Command.class, + ShoppingCart.ENTITY_TYPE, + R2dbcReplication.create(system), + system) + .withEdgeReplication(true); + return Replication.grpcReplication(replicationSettings, ShoppingCart::create, system); + } + // #init-allow-edge + + // #init-edge + public static EdgeReplication initEdge(ActorSystem system) { + ReplicationSettings replicationSettings = + ReplicationSettings.create( + Command.class, + ShoppingCart.ENTITY_TYPE, + R2dbcReplication.create(system), + system); + return Replication.grpcEdgeReplication(replicationSettings, ShoppingCart::create, system); + } + // #init-edge + private final ActorContext context; private final ReplicationContext replicationContext; private final boolean isLeader; diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf index 125a34756..806175308 100644 --- a/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/replication.conf @@ -4,6 +4,8 @@ akka.projection.grpc { } } +# Replication configuration for the ShoppingCart. Note that config `replicated-shopping-cart` +# is the same as the ShoppingCart.ENTITY_TYPE. replicated-shopping-cart { self-replica-id = replica1 self-replica-id = ${?SELF_REPLICA_ID} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf index 125a34756..b9813995a 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/replication.conf @@ -4,6 +4,8 @@ akka.projection.grpc { } } +# Replication configuration for the ShoppingCart. Note that config `replicated-shopping-cart` +# is the same as the ShoppingCart.EntityType. replicated-shopping-cart { self-replica-id = replica1 self-replica-id = ${?SELF_REPLICA_ID} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala index 6f5cccc26..42bc4cc9f 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala @@ -20,6 +20,7 @@ import akka.persistence.typed.scaladsl.ReplyEffect import akka.persistence.typed.scaladsl.RetentionCriteria import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.Replication.EdgeReplication import akka.projection.grpc.replication.scaladsl.ReplicationSettings import akka.projection.r2dbc.scaladsl.R2dbcReplication import akka.serialization.jackson.CborSerializable @@ -196,12 +197,13 @@ object ShoppingCart { // the replica they were created (but can be marked VIP at any point in time before being closed) // #init-producerFilter def initWithProducerFilter(implicit system: ActorSystem[_]): Replication[Command] = { - val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) val producerFilter: EventEnvelope[Event] => Boolean = { envelope => envelope.tags.contains(VipCustomerTag) } + val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) + .withProducerFilter(producerFilter) - Replication.grpcReplication(replicationSettings, producerFilter)(ShoppingCart.applyWithProducerFilter) + Replication.grpcReplication(replicationSettings)(ShoppingCart.applyWithProducerFilter) } def applyWithProducerFilter(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = { @@ -213,6 +215,21 @@ object ShoppingCart { } // #init-producerFilter + // #init-allow-edge + def initAllowEdge(implicit system: ActorSystem[_]): EdgeReplication[Command] = { + val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) + .withEdgeReplication(true) + Replication.grpcEdgeReplication(replicationSettings)(ShoppingCart.apply) + } + // #init-allow-edge + + // #init-edge + def initEdge(implicit system: ActorSystem[_]): EdgeReplication[Command] = { + val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) + Replication.grpcEdgeReplication(replicationSettings)(ShoppingCart.apply) + } + // #init-edge + } class ShoppingCart(