diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 006961459..18f73c0e1 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -149,7 +149,7 @@ jobs: - name: Compile Scala Projection gRPC sample Shopping Cart run: |- cd samples/grpc/shopping-cart-service-scala - sbt compile -Dakka-projection.version=`cat ~/.version` + sbt test -Dakka-projection.version=`cat ~/.version` - name: Compile Scala Projection gRPC sample Analytics run: |- @@ -159,7 +159,7 @@ jobs: - name: Compile Java Projection gRPC sample Shopping Cart run: |- cd samples/grpc/shopping-cart-service-java - mvn compile -nsu -Dakka-projection.version=`cat ~/.version` + mvn test -nsu -Dakka-projection.version=`cat ~/.version` - name: Compile Java Projection gRPC sample Analytics run: |- diff --git a/akka-distributed-cluster-docs/src/main/paradox/guide/1-event-sourced-shopping-cart.md b/akka-distributed-cluster-docs/src/main/paradox/guide/1-event-sourced-shopping-cart.md index 9114a8a1c..9f79d1ae6 100644 --- a/akka-distributed-cluster-docs/src/main/paradox/guide/1-event-sourced-shopping-cart.md +++ b/akka-distributed-cluster-docs/src/main/paradox/guide/1-event-sourced-shopping-cart.md @@ -12,7 +12,7 @@ The [Event Sourcing with Akka video](https://akka.io/blog/news/2020/01/07/akka-e ### Commands -Commands are the "external" API of an entity. Entity state can only be changed by commands. The results of commands are emitted as events. A command can request state changes, but different events might be generated depending on the current state of the entity. A command can also be rejected if it has invalid input or can’t be handled by current state of the entity. +Commands are the public API of an entity that other parts of the system use to interact with it. Entity state can only be changed by commands. The results of commands are emitted as events. A command can request state changes, and different events might be generated depending on the current state of the entity. A command can also be rejected if it has invalid input or can’t be handled by the current state of the entity. Scala : @@snip [ShoppingCart.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #commands #events } @@ -22,8 +22,8 @@ Java ### State -Items added to the Cart are added to a `Map`. The contents of the `Map` comprise the Cart’s state along with a customer id and customer category for the customer -owning the cart, if set, and a checkout timestamp if the cart was checked out: +Items added to the Cart are added to a `Map`. The contents of the `Map` comprise the Cart’s state along with an optional checkout timestamp, which +is set when the cart is checked out: Scala : @@snip [ShoppingCart.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #state } @@ -34,8 +34,11 @@ Java ### Command handler -The Cart entity will receive commands that request changes to Cart state. We will implement a command handler to process these commands and emit a reply. Our business logic allows only items to be added which are not in the cart yet and require a positive quantity. +The Cart entity will receive commands that request changes to Cart state. We will implement a command handler to process these commands and emit a reply, +the handler logic selected is different depending on if the cart is checked out already, replying with an error, or if the cart is still open for +adding and removing items. +The command handler for an open cart looks like this: Scala : @@snip [ShoppingCart.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #commandHandler } @@ -43,6 +46,14 @@ Scala Java : @@snip [ShoppingCart.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #commandHandler } +@@@ div { .group-java } + +The actual logic for handling the commands is implemented in methods on the `ShoppingCart` class, for example the `onAddItem` method: + +Java +: @@snip [ShoppingCart.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #onAddItem } + +@@@ ### Event handler @@ -69,8 +80,8 @@ Java ## Serialization -The state, commands and events of the entity must be serializable because they are written to the datastore or sent between nodes within the Akka cluster. The template project includes built-in CBOR serialization using the @extref[Akka Serialization Jackson module](akka:serialization-jackson.html). This section describes how serialization is implemented. You do not need to do anything specific to take advantage of CBOR, but this section explains how it is included. -The state, commands and events are marked as CborSerializable which is configured to use the built-in CBOR serialization. The template project includes this marker interface CborSerializable: +The state, commands and events of the entity must be serializable because they are written to the datastore or sent between nodes within the Akka cluster. The sample project includes built-in CBOR serialization using the @extref[Akka Serialization Jackson module](akka:serialization-jackson.html). This section describes how serialization is implemented. You do not need to do anything specific to take advantage of CBOR, but this section explains how it is included. +The state, commands and events are marked as CborSerializable which is configured to use the built-in CBOR serialization. The sample project includes this marker interface CborSerializable: Scala : @@snip [CborSerializable.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/CborSerializable.scala) { } diff --git a/akka-distributed-cluster-docs/src/main/paradox/guide/2-service-to-service.md b/akka-distributed-cluster-docs/src/main/paradox/guide/2-service-to-service.md index 639dfa06f..d6a702d27 100644 --- a/akka-distributed-cluster-docs/src/main/paradox/guide/2-service-to-service.md +++ b/akka-distributed-cluster-docs/src/main/paradox/guide/2-service-to-service.md @@ -40,13 +40,14 @@ Java Events can be transformed by application specific code on the producer side. The purpose is to be able to have a different public representation from the internal representation (stored in journal). The transformation functions -are registered when creating the `EventProducer` service. Here is an example of one of those transformation functions: +are registered when creating the `EventProducer` service. Here is an example of one of those transformation functions +accessing the projection envelope to include the shopping cart id in the public message type passed to consumers: Scala -: @@snip [PublishEvents.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala) { #transformItemAdded } +: @@snip [PublishEvents.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala) { #transformItemUpdated } Java -: @@snip [PublishEvents.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java) { #transformItemAdded } +: @@snip [PublishEvents.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java) { #transformItemUpdated } To omit an event the transformation function can return @scala[`None`]@java[`Optional.empty()`]. @@ -61,11 +62,11 @@ Java ## Consume events -The consumer is defined in a separate service, the shopping analytics service. It is a separate project with its own lifecycle, -it is started, stopped, deployed separately, and has its own separate database from the shopping cart service. It may run in the -same data center or cloud region as the shopping cart, but it could also run in a completely different location. +The consumer is defined in a separate @java[maven]@scala[sbt] project in the shopping analytics service. -FIXME a bit messy this section +The analytics service runs in a separate Akka cluster which is deployed and scaled separately from the shopping cart service. +When running it will have its own separate database from the shopping cart service. It may run in the same region as +the shopping cart, but it could also run in a completely different location. On the consumer side the `Projection` is a @extref[SourceProvider for eventsBySlices](akka-projection:eventsourced.html#sourceprovider-for-eventsbyslices) that is using `eventsBySlices` from the GrpcReadJournal. We use @extref[ShardedDaemonProcess](akka:typed/cluster-sharded-daemon-process.html) to distribute the instances of the Projection across the cluster. @@ -86,9 +87,8 @@ The gRPC connection to the producer is defined in the @extref[consumer configura The @extref:[R2dbcProjection](akka-persistence-r2dbc:projection.html) has support for storing the offset in a relational database using R2DBC. -The event handler for this sample is just logging the events rather than for example actually building its own read side: - -FIXME should we make it more realistic? +The event handler for this sample is just logging the events rather than for example actually building its own read side +in a database for querying: Scala : @@snip [ShoppingCartEventConsumer.scala](/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala) { #eventHandler } @@ -179,7 +179,184 @@ And the consuming analytics service: * Java: https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-analytics-service-java * Scala: https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-analytics-service-scala -FIXME running locally instructions here as well +## Running the sample code locally + +With a copy of each of the two sample projects for the language of your liking you can run the two services locally on +your own workstation. Docker, a JDK and @java[maven]@scala[sbt] is all that needs to be installed. + +### The Shopping Cart + +@@@ div { .group-scala } + +1. Start a local PostgresSQL server on default port 5432. The `docker-compose.yml` included in the shopping-cart project starts everything required for running locally. + + ```shell + docker-compose up -d + + # creates the tables needed for Akka Persistence + # as well as the offset store table for Akka Projection + docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql + ``` + +2. Start a first node: + + ```shell + sbt -Dconfig.resource=local1.conf run + ``` + +3. (Optional) Start another node with different ports: + + ```shell + sbt -Dconfig.resource=local2.conf run + ``` + +4. (Optional) More can be started: + + ```shell + sbt -Dconfig.resource=local3.conf run + ``` + +5. Check for service readiness + + ```shell + curl http://localhost:9101/ready + ``` + +6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl). Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter. + + ```shell + # add item to cart + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem + + # get cart + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart + + # update quantity of item + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem + + # check out cart + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout + + ``` + + or same `grpcurl` commands to port 8102 to reach node 2. + +@@@ + +@@@ div { .group-java } + +1. Start a local PostgresSQL server on default port 5432. The included `docker-compose.yml` starts everything required for running locally. + + ```shell + docker-compose up -d + + # creates the tables needed for Akka Persistence + # as well as the offset store table for Akka Projection + docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql + ``` + +2. Make sure you have compiled the project + + ```shell + mvn compile + ``` + +3. Start a first node: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=local1.conf + ``` + +4. (Optional) Start another node with different ports: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=local2.conf + ``` + +5. (Optional) More can be started: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=local3.conf + ``` + +6. Check for service readiness + + ```shell + curl http://localhost:9101/ready + ``` + +7. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl): + + ```shell + # add item to cart + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem + + # get cart + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart + + # update quantity of item + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem + + # check out cart + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout + ``` + + or same `grpcurl` commands to port 8102 to reach node 2. + +@@@ + +### The analytics service + +@@@ div { .group-scala } + + +1. Start a local PostgresSQL server on default port 5432. The included `docker-compose.yml` starts everything required for running locally. Note that for convenience this service and the shopping cart service is sharing the same database, in an actual service consuming events the consuming services are expected to have their own separate databases. + + ```shell + docker-compose up -d + + # creates the tables needed for Akka Persistence + # as well as the offset store table for Akka Projection + docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql + ``` + +2. Start a first node: + + ```shell + sbt -Dconfig.resource=local1.conf run + ``` + +3. Start `shopping-cart-service` and add item to cart + +4. Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter. + +5. Notice the log output in the terminal of the `shopping-analytics-service` + +@@@ + +@@@ div { .group-java } + +1. Start a local PostgresSQL server on default port 5432. The included `docker-compose.yml` starts everything required for running locally. Note that for convenience this service and the shopping cart service is sharing the same database, in an actual service consuming events the consuming services are expected to have their own separate databases. + + ```shell + docker-compose up -d + + # creates the tables needed for Akka Persistence + # as well as the offset store table for Akka Projection + docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql + ``` + +2. Start a first node: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=local1.conf + ``` + +3. Start `shopping-cart-service` and add item to cart + +4. Notice the log output in the terminal of the `shopping-analytics-service` + +@@@ ## What's next? diff --git a/akka-distributed-cluster-docs/src/main/paradox/guide/3-active-active.md b/akka-distributed-cluster-docs/src/main/paradox/guide/3-active-active.md index 75785f46f..6f307820c 100644 --- a/akka-distributed-cluster-docs/src/main/paradox/guide/3-active-active.md +++ b/akka-distributed-cluster-docs/src/main/paradox/guide/3-active-active.md @@ -1,34 +1,311 @@ # Part 3: Active-active -For an Active-Active shopping cart, the same service will run in different data centers or cloud regions - each called a replica. +Active-active means that the same shopping cart is in memory in multiple locations, replicas, and can accept updates +in all of those locations, so that a cloud region outage does not block users from continuing to use the service. + +![projection-over-grpc.png](../images/res-over-grpc.png) + +@extref[Akka Replicated Event Sourcing](akka:typed/replicated-eventsourcing.html) stores persisted events in a local database, +without any need for replication capabilities in the database itself, the events are then replicated using the @extref[Akka Replicated Event Sourcing gRPC transport](akka-projection:grpc-replicated-event-sourcing-transport.html). + +The shopping cart will be eventually consistent, meaning that an update to the cart in one replica will not immediately +be visible in the other replicas, but eventually all replicas will reach the same state. ## Turning the shopping cart into a Replicated Entity -FIXME show off RES API +The API for writing a Replicated Event Sourced entity is mostly the exact same as the Event Sourced Behavior we already +implemented our entity with. + +The events from other replicas will be replicated and are passed to the event handler of the entity +just like events that was written by the shopping cart instance itself, however there is no global ordering of the events, +so events may be seen in a different order than the wall clock order they happened, and in a different order in each replica, +especially in circumstances where there are outages or connectivity problems between the replicas. + +### Updating the cart contents + +Because of the possibility of observing the events out of order when they have been written to different replicas, we must +make sure the state ends up the same even in the face of re-ordered events. + +This can be handled by keeping track of the quantity of an item both when it is positive and negative, so that seeing +a remove of 1 item before an add 1 of the same item ends up as the item removed with a zero quantity. We have already +represented add and remove as a negative or positive number in the `ItemUpdatedEvent` + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #itemUpdatedEvent } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #itemUpdatedEvent } + +In the state, we keep a map from `itemId` to the current quantity for each product. For each update we see, we add the positive or negative +number to the quantity, getting the same number regardless of what order the changes arrived: + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #stateUpdateItem } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #stateUpdateItem } + + +### Checking the cart out + +Another, more complex thing to deal with is checking out. When we had only one instance of the cart, it was safe to say that +after a checkout event, no more updates could happen to the shopping cart. With a replicated cart this is no longer the case +as an update could be sent to one replica, and the checkout to another, so that the update arrives to the checked out replica +after it was checked out, even though at the time of the update, the cart was not yet checked out. + +We can solve this by turning checkout into a two-step process, where the initial checkout triggers storing a checkout event +per replica, and letting one of the replicas get a designation as leader, completing the checkout once it has seen checkout +events from all replicas. + +When we receive a checkout command, we store a `Closed` event which also contains the id of the replica. We can get the "self" replica id +from the @apidoc[akka.persistence.typed.*.ReplicationContext]: + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #checkoutStep1 } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #checkoutStep1 } + + +In the state we keep a set of all replicas where the cart was closed: + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #checkoutStep1Event } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #checkoutStep1Event } + + +After applying the event to the state, we check if one of the other replicas was closed, and in that case we trigger a command to +the shopping cart itself to close also in this replica using the `CloseForCheckout` command. + +If all replicas are closed and this is the designated leader we trigger a `CompleteCheckout` command. Note that this logic +is only triggered if the entity got the event replicated, and not if it is "recovering" - starting after it was stopped and +is replaying all events stored in the journal: + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #checkoutEventTrigger } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #checkoutEventTrigger } + +The logic for leader makes sure to not tie all carts to the same replica as leader, but instead spreading it across the replicas, by basing it of a hash of the id of the cart. +This again uses the @apidoc[akka.persistence.typed.*.ReplicationContext] to access the list of all replicas: + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #leader } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #leader } + + +Note that this still means that while adding and removing can be done in the face of an outage, all replicas must be online +for any shopping cart to be able to close, so it does not give us complete high-availability for the shopping cart, +but it illustrates how we can coordinate when needed. + ## Filters By default, events from all Replicated Event Sourced entities are replicated. The same kind of filters as described for @ref:[Service to Service](2-service-to-service.md#filters) can be used for -Replicated Event Sourcing. +Replicated Event Sourcing. Replicated Event Sourcing is bidirectional replication, and therefore you would typically +have to define the same filters on both sides. That is not handled automatically. + +One way to make sure filtering is reflective is using a property of the state to tag events, an event applied to the +state triggering the tag will be replicated to other nodes and lead to the same property change there, eventually +causing the same filter on all replicas. + +To add such filtering to the shopping cart we have added a `vipCustomer` flag on the state, which when true +will lead to adding the tag `vip` to any event emitted for the cart: + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #stateVipCustomer } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #stateVipCustomer } -The producer defined filter: + +An alternative initialization method adds a filter looking for the `vip` tag: + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init-producerFilter } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init-producerFilter } + +For carts not replicated the replica commit round triggered by check out would never complete, as the other +replicas can not see events from the non-replicated carts. + +An additional check immediately completes checkout for such carts: Scala -: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init } +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #checkoutEventTrigger } Java -: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init } +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #checkoutEventTrigger } + + +## Running the sample code locally + +@@@ div { .group-scala } + +1. Start two local PostgresSQL servers, on ports 5101 and 5201. The included `docker-compose.yml` starts everything required for running locally. + + ```shell + docker-compose up -d + + # creates the tables needed for Akka Persistence + # as well as the offset store table for Akka Projection + docker exec -i postgres_db_1 psql -U postgres -t < ddl-scripts/create_tables.sql + docker exec -i postgres_db_2 psql -U postgres -t < ddl-scripts/create_tables.sql + ``` + +2. Start a first node for each replica: + + ```shell + sbt -Dconfig.resource=replica1-local1.conf run + ``` + + ```shell + sbt -Dconfig.resource=replica2-local1.conf run + ``` + +3. (Optional) Start another node with different ports: + + ```shell + sbt -Dconfig.resource=replica1-local2.conf run + ``` + + ```shell + sbt -Dconfig.resource=replica2-local2.conf run + ``` + +4. (Optional) More can be started: + + ```shell + sbt -Dconfig.resource=replica1-local3.conf run + ``` + + ```shell + sbt -Dconfig.resource=replica2-local3.conf run + ``` + +5. Check for service readiness + + ```shell + curl http://localhost:9101/ready + ``` + + ```shell + curl http://localhost:9201/ready + ``` + +6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl): + + ```shell + # add item to cart on the first replica + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":7}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem + + # get cart from first replica + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart + + # get cart from second replica + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8201 shoppingcart.ShoppingCartService.GetCart + + # update quantity of item on the second replica + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":2}' -plaintext 127.0.0.1:8201 shoppingcart.ShoppingCartService.RemoveItem + + # check out cart + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout + + or same `grpcurl` commands to port 8102/8202 to reach node 2. + ``` + +@@@ + +@@@ div { .group-java } + + +1. Start two local PostgresSQL servers, on ports 5101 and 5201. The included `docker-compose.yml` starts everything required for running locally. + + ```shell + docker-compose up -d + + # creates the tables needed for Akka Persistence + # as well as the offset store table for Akka Projection + docker exec -i postgres_db_1 psql -U postgres -t < ddl-scripts/create_tables.sql + docker exec -i postgres_db_2 psql -U postgres -t < ddl-scripts/create_tables.sql + ``` + +2. Make sure you have compiled the project + + ```shell + mvn compile + ``` + +3. Start a first node for each replica: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica1-local1.conf + ``` + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica2-local1.conf + ``` + +4. (Optional) Start another node with different ports: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica1-local2.conf + ``` + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica2-local2.conf + ``` + +5. (Optional) More can be started: + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica1-local3.conf + ``` + + ```shell + mvn compile exec:exec -DAPP_CONFIG=replica2-local3.conf + ``` + +6. Check for service readiness + + ```shell + curl http://localhost:9101/ready + ``` + + ```shell + curl http://localhost:9201/ready + ``` + +7. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl): + + ```shell + # add item to cart on the first replica + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":7}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem + + # get cart from first replica + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart + + # get cart from second replica + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8201 shoppingcart.ShoppingCartService.GetCart -Consumer defined filters are updated as described in @extref:[Akka Projection gRPC Consumer defined filter](akka-projection:grpc.md#consumer-defined-filter) + # update quantity of item on the second replica + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":2}' -plaintext 127.0.0.1:8201 shoppingcart.ShoppingCartService.RemoveItem -One thing to note is that `streamId` is always the same as the `entityType` when using Replicated Event Sourcing. + # check out cart + grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout + ``` -The entity id based filter criteria must include the replica id as suffix to the entity id, with `|` separator. + or same `grpcurl` commands to port 8102/8202 to reach node 2. -Replicated Event Sourcing is bidirectional replication, and therefore you would typically have to define the same -filters on both sides. That is not handled automatically. +@@@ ## What's next? diff --git a/akka-distributed-cluster-docs/src/main/paradox/guide/4-deploying.md b/akka-distributed-cluster-docs/src/main/paradox/guide/4-deploying.md index c6b149b08..870c79934 100644 --- a/akka-distributed-cluster-docs/src/main/paradox/guide/4-deploying.md +++ b/akka-distributed-cluster-docs/src/main/paradox/guide/4-deploying.md @@ -1,7 +1,20 @@ -# Part 4: Deploying the service +# Part 4: Deploying with Kubernetes -FIXME k8 deployment of the service +FIXME bla bla a namespace `shopping-cart-namespace` -FIXME ingress configuration +JSON +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/kubernetes/namespace.json) { } -FIXME mTLS for the replication/s2s event streams \ No newline at end of file +FIXME bla bla pre-requisite for Akka management + +YAML +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/kubernetes/akka-cluster-roles.yml) { } + +FIXME bla bla the service itself + +YAML +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/kubernetes/akka-cluster.yml) { } + +FIXME bla bla database password in secrets, a PostgreSQL instance of some kind, schema not managed by service itself + +FIXME something about mTLS for the replication/s2s event streams \ No newline at end of file diff --git a/docs/src/main/paradox/grpc.md b/docs/src/main/paradox/grpc.md index 64fa571b5..495269a51 100644 --- a/docs/src/main/paradox/grpc.md +++ b/docs/src/main/paradox/grpc.md @@ -106,13 +106,14 @@ Java Events can be transformed by application specific code on the producer side. The purpose is to be able to have a different public representation from the internal representation (stored in journal). The transformation functions -are registered when creating the `EventProducer` service. Here is an example of one of those transformation functions: +are registered when creating the `EventProducer` service. Here is an example of one of those transformation functions +accessing the projection envelope to include the shopping cart id in the public message type passed to consumers: Scala -: @@snip [PublishEvents.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala) { #transformItemAdded } +: @@snip [PublishEvents.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala) { #transformItemUpdated } Java -: @@snip [PublishEvents.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java) { #transformItemAdded } +: @@snip [PublishEvents.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java) { #transformItemUpdated } To omit an event the transformation function can return @scala[`None`]@java[`Optional.empty()`]. diff --git a/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java b/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java index 774cdf0b9..66682ac13 100644 --- a/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java +++ b/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java @@ -27,9 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.CheckedOut; -import shopping.cart.proto.ItemAdded; import shopping.cart.proto.ItemQuantityAdjusted; -import shopping.cart.proto.ItemRemoved; import java.util.List; import java.util.Optional; @@ -73,18 +71,10 @@ public CompletionStage process(EventEnvelope envelope) { Object event = envelope.getEvent(); totalCount++; - if (event instanceof ItemAdded) { - ItemAdded itemAdded = (ItemAdded) event; - log.info("Projection [{}] consumed ItemAdded for cart {}, added {} {}. Total [{}] events.", - projectionId.id(), itemAdded.getCartId(), itemAdded.getQuantity(), itemAdded.getItemId(), totalCount); - } else if (event instanceof ItemQuantityAdjusted) { + if (event instanceof ItemQuantityAdjusted) { ItemQuantityAdjusted itemQuantityAdjusted = (ItemQuantityAdjusted) event; log.info("Projection [{}] consumed ItemQuantityAdjusted for cart {}, changed {} {}. Total [{}] events.", projectionId.id(), itemQuantityAdjusted.getCartId(), itemQuantityAdjusted.getQuantity(), itemQuantityAdjusted.getItemId(), totalCount); - } else if (event instanceof ItemRemoved) { - ItemRemoved itemRemoved = (ItemRemoved) event; - log.info("Projection [{}] consumed ItemRemoved for cart {}, removed {}. Total [{}] events.", - projectionId.id(), itemRemoved.getCartId(), itemRemoved.getItemId(), totalCount); } else if (event instanceof CheckedOut) { CheckedOut checkedOut = (CheckedOut) event; log.info("Projection [{}] consumed CheckedOut for cart {}. Total [{}] events.", projectionId.id(), checkedOut.getCartId(), totalCount); diff --git a/samples/grpc/shopping-analytics-service-java/src/main/protobuf/ShoppingCartEvents.proto b/samples/grpc/shopping-analytics-service-java/src/main/protobuf/ShoppingCartEvents.proto index fc721c46f..6d64e3aa7 100644 --- a/samples/grpc/shopping-analytics-service-java/src/main/protobuf/ShoppingCartEvents.proto +++ b/samples/grpc/shopping-analytics-service-java/src/main/protobuf/ShoppingCartEvents.proto @@ -6,24 +6,12 @@ option java_package = "shopping.cart.proto"; package shoppingcart; // Events published to external services - -message ItemAdded { - string cartId = 1; - string itemId = 2; - int32 quantity = 3; -} - message ItemQuantityAdjusted { string cartId = 1; string itemId = 2; int32 quantity = 3; } -message ItemRemoved { - string cartId = 1; - string itemId = 2; -} - message CheckedOut { string cartId = 1; } diff --git a/samples/grpc/shopping-analytics-service-java/src/main/resources/grpc.conf b/samples/grpc/shopping-analytics-service-java/src/main/resources/grpc.conf index 588778edb..59e59e7b2 100644 --- a/samples/grpc/shopping-analytics-service-java/src/main/resources/grpc.conf +++ b/samples/grpc/shopping-analytics-service-java/src/main/resources/grpc.conf @@ -1,4 +1,4 @@ -akka.http.server.preview.enable-http2 = on +akka.http.server.enable-http2 = on akka.projection.grpc.consumer { client { diff --git a/samples/grpc/shopping-analytics-service-scala/src/main/protobuf/ShoppingCartEvents.proto b/samples/grpc/shopping-analytics-service-scala/src/main/protobuf/ShoppingCartEvents.proto index 9d6912a80..054d14868 100644 --- a/samples/grpc/shopping-analytics-service-scala/src/main/protobuf/ShoppingCartEvents.proto +++ b/samples/grpc/shopping-analytics-service-scala/src/main/protobuf/ShoppingCartEvents.proto @@ -4,11 +4,6 @@ package shoppingcart; // Events published to external services -message ItemAdded { - string cartId = 1; - string itemId = 2; - int32 quantity = 3; -} message ItemQuantityAdjusted { string cartId = 1; @@ -16,11 +11,6 @@ message ItemQuantityAdjusted { int32 quantity = 3; } -message ItemRemoved { - string cartId = 1; - string itemId = 2; -} - message CheckedOut { string cartId = 1; } diff --git a/samples/grpc/shopping-analytics-service-scala/src/main/resources/grpc.conf b/samples/grpc/shopping-analytics-service-scala/src/main/resources/grpc.conf index 588778edb..59e59e7b2 100644 --- a/samples/grpc/shopping-analytics-service-scala/src/main/resources/grpc.conf +++ b/samples/grpc/shopping-analytics-service-scala/src/main/resources/grpc.conf @@ -1,4 +1,4 @@ -akka.http.server.preview.enable-http2 = on +akka.http.server.enable-http2 = on akka.projection.grpc.consumer { client { diff --git a/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala b/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala index 77a453ad7..ec5eea5c4 100644 --- a/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala +++ b/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala @@ -17,9 +17,7 @@ import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.scaladsl.Handler import org.slf4j.LoggerFactory import shoppingcart.CheckedOut -import shoppingcart.ItemAdded import shoppingcart.ItemQuantityAdjusted -import shoppingcart.ItemRemoved import shoppingcart.ShoppingCartEventsProto object ShoppingCartEventConsumer { @@ -55,14 +53,6 @@ object ShoppingCartEventConsumer { totalCount += 1 event match { - case itemAdded: ItemAdded => - log.info( - "Projection [{}] consumed ItemAdded for cart {}, added {} {}. Total [{}] events.", - projectionId.id, - itemAdded.cartId, - itemAdded.quantity, - itemAdded.itemId, - totalCount) case quantityAdjusted: ItemQuantityAdjusted => log.info( "Projection [{}] consumed ItemQuantityAdjusted for cart {}, changed {} {}. Total [{}] events.", @@ -71,13 +61,6 @@ object ShoppingCartEventConsumer { quantityAdjusted.quantity, quantityAdjusted.itemId, totalCount) - case itemRemoved: ItemRemoved => - log.info( - "Projection [{}] consumed ItemRemoved for cart {}, removed {}. Total [{}] events.", - projectionId.id, - itemRemoved.cartId, - itemRemoved.itemId, - totalCount) case checkedOut: CheckedOut => log.info( "Projection [{}] consumed CheckedOut for cart {}. Total [{}] events.", diff --git a/samples/grpc/shopping-cart-service-java/README.md b/samples/grpc/shopping-cart-service-java/README.md index 51aeb7385..ac7a5cc48 100644 --- a/samples/grpc/shopping-cart-service-java/README.md +++ b/samples/grpc/shopping-cart-service-java/README.md @@ -50,7 +50,7 @@ grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart # update quantity of item - grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.UpdateItem + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem # check out cart grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout diff --git a/samples/grpc/shopping-cart-service-java/pom.xml b/samples/grpc/shopping-cart-service-java/pom.xml index f8d4532f8..dcfebb456 100644 --- a/samples/grpc/shopping-cart-service-java/pom.xml +++ b/samples/grpc/shopping-cart-service-java/pom.xml @@ -65,6 +65,11 @@ akka-cluster-tools_${scala.binary.version} ${akka.version} + + com.typesafe.akka + akka-discovery_${scala.binary.version} + ${akka.version} + com.lightbend.akka akka-persistence-r2dbc_${scala.binary.version} diff --git a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java index 4f697703f..d21ce0e74 100644 --- a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java +++ b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java @@ -5,6 +5,8 @@ import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.japi.function.Function; +import akka.persistence.query.typed.EventEnvelope; +import akka.persistence.typed.PersistenceId; import akka.projection.grpc.producer.EventProducerSettings; import akka.projection.grpc.producer.javadsl.EventProducer; import akka.projection.grpc.producer.javadsl.EventProducerSource; @@ -12,6 +14,7 @@ import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public class PublishEvents { @@ -19,10 +22,8 @@ public class PublishEvents { public static Function> eventProducerService(ActorSystem system) { Transformation transformation = Transformation.empty() - .registerMapper(ShoppingCart.ItemAdded.class, event -> Optional.of(transformItemAdded(event))) - .registerMapper(ShoppingCart.ItemQuantityAdjusted.class, event -> Optional.of(transformItemQuantityAdjusted(event))) - .registerMapper(ShoppingCart.ItemRemoved.class, event -> Optional.of(transformItemRemoved(event))) - .registerMapper(ShoppingCart.CheckedOut.class, event -> Optional.of(transformCheckedOut(event))); + .registerAsyncEnvelopeMapper(ShoppingCart.ItemUpdated.class, envelope -> CompletableFuture.completedFuture(Optional.of(transformItemQuantityAdjusted(envelope)))) + .registerAsyncEnvelopeMapper(ShoppingCart.CheckedOut.class, envelope -> CompletableFuture.completedFuture(Optional.of(transformCheckedOut(envelope)))); //#withProducerFilter EventProducerSource eventProducerSource = new EventProducerSource( @@ -50,34 +51,20 @@ public static Function> eventProducer } //#eventProducerService - //#transformItemAdded - private static shopping.cart.proto.ItemAdded transformItemAdded(ShoppingCart.ItemAdded itemAdded) { - return shopping.cart.proto.ItemAdded.newBuilder() - .setCartId(itemAdded.cartId) - .setItemId(itemAdded.itemId) - .setQuantity(itemAdded.quantity) - .build(); - } - //#transformItemAdded - - private static shopping.cart.proto.ItemQuantityAdjusted transformItemQuantityAdjusted(ShoppingCart.ItemQuantityAdjusted itemQuantityAdjusted) { + //#transformItemUpdated + private static shopping.cart.proto.ItemQuantityAdjusted transformItemQuantityAdjusted(EventEnvelope envelope) { + var itemUpdated = envelope.event(); return shopping.cart.proto.ItemQuantityAdjusted.newBuilder() - .setCartId(itemQuantityAdjusted.cartId) - .setItemId(itemQuantityAdjusted.itemId) - .setQuantity(itemQuantityAdjusted.newQuantity) - .build(); - } - - private static shopping.cart.proto.ItemRemoved transformItemRemoved(ShoppingCart.ItemRemoved itemRemoved) { - return shopping.cart.proto.ItemRemoved.newBuilder() - .setCartId(itemRemoved.cartId) - .setItemId(itemRemoved.itemId) + .setCartId(PersistenceId.extractEntityId(envelope.persistenceId())) + .setItemId(itemUpdated.itemId) + .setQuantity(itemUpdated.quantity) .build(); } + //#transformItemUpdated - private static shopping.cart.proto.CheckedOut transformCheckedOut(ShoppingCart.CheckedOut checkedOut) { + private static shopping.cart.proto.CheckedOut transformCheckedOut(EventEnvelope envelope) { return shopping.cart.proto.CheckedOut.newBuilder() - .setCartId(checkedOut.cartId) + .setCartId(PersistenceId.extractEntityId(envelope.persistenceId())) .build(); } //#eventProducerService diff --git a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java index 6b0542931..72dcda4b7 100644 --- a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java +++ b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java @@ -73,16 +73,12 @@ public Summary toSummary() { return new Summary(items, isCheckedOut()); } - public boolean hasItem(String itemId) { - return items.containsKey(itemId); - } - public State updateItem(String itemId, int quantity) { - if (quantity == 0) { + int newQuantity = items.getOrDefault(itemId, 0) + quantity; + if (newQuantity > 0) + items.put(itemId, newQuantity); + else items.remove(itemId); - } else { - items.put(itemId, quantity); - } return this; } @@ -90,15 +86,6 @@ public boolean isEmpty() { return items.isEmpty(); } - public State removeItem(String itemId) { - items.remove(itemId); - return this; - } - - public int itemCount(String itemId) { - return items.get(itemId); - } - //#tags public int totalQuantity() { return items.values().stream().reduce(0, Integer::sum); @@ -144,22 +131,11 @@ public AddItem(String itemId, int quantity, ActorRef> reply /** A command to remove an item from the cart. */ public static final class RemoveItem implements Command { - final String itemId; - final ActorRef> replyTo; - - public RemoveItem(String itemId, ActorRef> replyTo) { - this.itemId = itemId; - this.replyTo = replyTo; - } - } - - /** A command to adjust the quantity of an item in the cart. */ - public static final class AdjustItemQuantity implements Command { final String itemId; final int quantity; final ActorRef> replyTo; - public AdjustItemQuantity(String itemId, int quantity, ActorRef> replyTo) { + public RemoveItem(String itemId, int quantity, ActorRef> replyTo) { this.itemId = itemId; this.quantity = quantity; this.replyTo = replyTo; @@ -201,115 +177,42 @@ public Summary(Map items, boolean checkedOut) { //#events abstract static class Event implements CborSerializable { - public final String cartId; - - public Event(String cartId) { - this.cartId = cartId; - } } - static final class ItemAdded extends Event { + static final class ItemUpdated extends Event { public final String itemId; public final int quantity; - public ItemAdded(String cartId, String itemId, int quantity) { - super(cartId); + public ItemUpdated(String itemId, int quantity) { this.itemId = itemId; this.quantity = quantity; } + // #itemUpdatedEvent @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - ItemAdded other = (ItemAdded) o; + ItemUpdated other = (ItemUpdated) o; if (quantity != other.quantity) return false; - if (!cartId.equals(other.cartId)) return false; return itemId.equals(other.itemId); } @Override public int hashCode() { - int result = cartId.hashCode(); - result = 31 * result + itemId.hashCode(); + int result = itemId.hashCode(); result = 31 * result + quantity; return result; } } - static final class ItemRemoved extends Event { - public final String itemId; - public final int oldQuantity; - - public ItemRemoved(String cartId, String itemId, int oldQuantity) { - super(cartId); - this.itemId = itemId; - this.oldQuantity = oldQuantity; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ItemRemoved other = (ItemRemoved) o; - - if (oldQuantity != other.oldQuantity) return false; - if (!cartId.equals(other.cartId)) return false; - return itemId.equals(other.itemId); - } - - @Override - public int hashCode() { - int result = cartId.hashCode(); - result = 31 * result + itemId.hashCode(); - result = 31 * result + oldQuantity; - return result; - } - } - - static final class ItemQuantityAdjusted extends Event { - public final String itemId; - final int oldQuantity; - final int newQuantity; - - public ItemQuantityAdjusted(String cartId, String itemId, int oldQuantity, int newQuantity) { - super(cartId); - this.itemId = itemId; - this.oldQuantity = oldQuantity; - this.newQuantity = newQuantity; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ItemQuantityAdjusted other = (ItemQuantityAdjusted) o; - - if (oldQuantity != other.oldQuantity) return false; - if (newQuantity != other.newQuantity) return false; - if (!cartId.equals(other.cartId)) return false; - return itemId.equals(other.itemId); - } - - @Override - public int hashCode() { - int result = cartId.hashCode(); - result = 31 * result + itemId.hashCode(); - result = 31 * result + oldQuantity; - result = 31 * result + newQuantity; - return result; - } - } - static final class CheckedOut extends Event { final Instant eventTime; - public CheckedOut(String cartId, Instant eventTime) { - super(cartId); + @JsonCreator + public CheckedOut(Instant eventTime) { this.eventTime = eventTime; } @@ -385,26 +288,21 @@ private CommandHandlerWithReplyBuilderByState open .forState(state -> !state.isCheckedOut()) .onCommand(AddItem.class, this::onAddItem) .onCommand(RemoveItem.class, this::onRemoveItem) - .onCommand(AdjustItemQuantity.class, this::onAdjustItemQuantity) .onCommand(Checkout.class, this::onCheckout); } //#commandHandler + //#onAddItem private ReplyEffect onAddItem(State state, AddItem cmd) { - if (state.hasItem(cmd.itemId)) { - return Effect() - .reply( - cmd.replyTo, - StatusReply.error( - "Item '" + cmd.itemId + "' was already added to this shopping cart")); - } else if (cmd.quantity <= 0) { + if (cmd.quantity <= 0) { return Effect().reply(cmd.replyTo, StatusReply.error("Quantity must be greater than zero")); } else { return Effect() - .persist(new ItemAdded(cartId, cmd.itemId, cmd.quantity)) + .persist(new ItemUpdated(cmd.itemId, cmd.quantity)) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } } + //#onAddItem private ReplyEffect onCheckout(State state, Checkout cmd) { if (state.isEmpty()) { @@ -412,42 +310,15 @@ private ReplyEffect onCheckout(State state, Checkout cmd) { .reply(cmd.replyTo, StatusReply.error("Cannot checkout an empty shopping cart")); } else { return Effect() - .persist(new CheckedOut(cartId, Instant.now())) + .persist(new CheckedOut(Instant.now())) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } } private ReplyEffect onRemoveItem(State state, RemoveItem cmd) { - if (state.hasItem(cmd.itemId)) { - return Effect() - .persist(new ItemRemoved(cartId, cmd.itemId, state.itemCount(cmd.itemId))) - .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); - } else { - return Effect() - .reply( - cmd.replyTo, - StatusReply.success(state.toSummary())); // removing an item is idempotent - } - } - - private ReplyEffect onAdjustItemQuantity(State state, AdjustItemQuantity cmd) { - if (cmd.quantity <= 0) { - return Effect().reply(cmd.replyTo, StatusReply.error("Quantity must be greater than zero")); - } else if (state.hasItem(cmd.itemId)) { - return Effect() - .persist( - new ItemQuantityAdjusted( - cartId, cmd.itemId, state.itemCount(cmd.itemId), cmd.quantity)) - .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); - } else { - return Effect() - .reply( - cmd.replyTo, - StatusReply.error( - "Cannot adjust quantity for item '" - + cmd.itemId - + "'. Item not present on cart")); - } + return Effect() + .persist(new ItemUpdated(cmd.itemId, -cmd.quantity)) + .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } private CommandHandlerWithReplyBuilderByState @@ -470,14 +341,6 @@ private ReplyEffect onAdjustItemQuantity(State state, AdjustItemQu cmd.replyTo, StatusReply.error( "Can't remove an item from an already checked out shopping cart"))) - .onCommand( - AdjustItemQuantity.class, - cmd -> - Effect() - .reply( - cmd.replyTo, - StatusReply.error( - "Can't adjust item on an already checked out shopping cart"))) .onCommand( Checkout.class, cmd -> @@ -498,11 +361,9 @@ private CommandHandlerWithReplyBuilderByState getC public EventHandler eventHandler() { return newEventHandlerBuilder() .forAnyState() - .onEvent(ItemAdded.class, (state, evt) -> state.updateItem(evt.itemId, evt.quantity)) - .onEvent(ItemRemoved.class, (state, evt) -> state.removeItem(evt.itemId)) .onEvent( - ItemQuantityAdjusted.class, - (state, evt) -> state.updateItem(evt.itemId, evt.newQuantity)) + ItemUpdated.class, + (state, evt) -> state.updateItem(evt.itemId, evt.quantity)) .onEvent(CheckedOut.class, (state, evt) -> state.checkout(evt.eventTime)) .build(); } diff --git a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java index 068a6790a..b30a80991 100644 --- a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java +++ b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java @@ -8,12 +8,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.AddItemRequest; +import shopping.cart.proto.RemoveItemRequest; import shopping.cart.proto.Cart; import shopping.cart.proto.CheckoutRequest; import shopping.cart.proto.GetCartRequest; import shopping.cart.proto.Item; import shopping.cart.proto.ShoppingCartService; -import shopping.cart.proto.UpdateItemRequest; import java.time.Duration; import java.util.List; @@ -49,22 +49,14 @@ public CompletionStage addItem(AddItemRequest in) { } @Override - public CompletionStage updateItem(UpdateItemRequest in) { + public CompletionStage removeItem(RemoveItemRequest in) { logger.info("updateItem {}", in.getCartId()); EntityRef entityRef = sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId()); - final CompletionStage reply; - if (in.getQuantity() == 0) { - reply = - entityRef.askWithStatus( - replyTo -> new ShoppingCart.RemoveItem(in.getItemId(), replyTo), timeout); - } else { - reply = - entityRef.askWithStatus( - replyTo -> - new ShoppingCart.AdjustItemQuantity(in.getItemId(), in.getQuantity(), replyTo), - timeout); - } + CompletionStage reply = + entityRef.askWithStatus( + replyTo -> new ShoppingCart.RemoveItem(in.getItemId(), in.getQuantity(), replyTo), + timeout); CompletionStage cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart); return convertError(cart); } diff --git a/samples/grpc/shopping-cart-service-java/src/main/protobuf/ShoppingCartEvents.proto b/samples/grpc/shopping-cart-service-java/src/main/protobuf/ShoppingCartEvents.proto index fc721c46f..ce3f86873 100644 --- a/samples/grpc/shopping-cart-service-java/src/main/protobuf/ShoppingCartEvents.proto +++ b/samples/grpc/shopping-cart-service-java/src/main/protobuf/ShoppingCartEvents.proto @@ -7,23 +7,12 @@ package shoppingcart; // Events published to external services -message ItemAdded { - string cartId = 1; - string itemId = 2; - int32 quantity = 3; -} - message ItemQuantityAdjusted { string cartId = 1; string itemId = 2; int32 quantity = 3; } -message ItemRemoved { - string cartId = 1; - string itemId = 2; -} - message CheckedOut { string cartId = 1; } diff --git a/samples/grpc/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto b/samples/grpc/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto index e68d0f40c..611cc45b0 100644 --- a/samples/grpc/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto +++ b/samples/grpc/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto @@ -9,7 +9,7 @@ package shoppingcart; service ShoppingCartService { rpc AddItem (AddItemRequest) returns (Cart) {} - rpc UpdateItem (UpdateItemRequest) returns (Cart) {} + rpc RemoveItem (RemoveItemRequest) returns (Cart) {} rpc Checkout (CheckoutRequest) returns (Cart) {} rpc GetCart (GetCartRequest) returns (Cart) {} } @@ -20,7 +20,7 @@ message AddItemRequest { int32 quantity = 3; } -message UpdateItemRequest { +message RemoveItemRequest { string cartId = 1; string itemId = 2; int32 quantity = 3; diff --git a/samples/grpc/shopping-cart-service-java/src/main/resources/grpc.conf b/samples/grpc/shopping-cart-service-java/src/main/resources/grpc.conf index fc3f8b88c..62b45f386 100644 --- a/samples/grpc/shopping-cart-service-java/src/main/resources/grpc.conf +++ b/samples/grpc/shopping-cart-service-java/src/main/resources/grpc.conf @@ -1,5 +1,5 @@ // #http2 -akka.http.server.preview.enable-http2 = on +akka.http.server.enable-http2 = on // #http2 shopping-cart-service { diff --git a/samples/grpc/shopping-cart-service-java/src/test/java/shopping/cart/ShoppingCartTest.java b/samples/grpc/shopping-cart-service-java/src/test/java/shopping/cart/ShoppingCartTest.java index 83dd9c2a3..0bb826f05 100644 --- a/samples/grpc/shopping-cart-service-java/src/test/java/shopping/cart/ShoppingCartTest.java +++ b/samples/grpc/shopping-cart-service-java/src/test/java/shopping/cart/ShoppingCartTest.java @@ -48,28 +48,7 @@ public void addAnItemToCart() { assertFalse(summary.checkedOut); assertEquals(1, summary.items.size()); assertEquals(42, summary.items.get("foo").intValue()); - assertEquals(new ShoppingCart.ItemAdded(CART_ID, "foo", 42), result.event()); - } - - @Test - public void rejectAlreadyAddedItem() { - CommandResultWithReply< - ShoppingCart.Command, - ShoppingCart.Event, - ShoppingCart.State, - StatusReply> - result1 = - eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); - assertTrue(result1.reply().isSuccess()); - CommandResultWithReply< - ShoppingCart.Command, - ShoppingCart.Event, - ShoppingCart.State, - StatusReply> - result2 = - eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); - assertTrue(result2.reply().isError()); - assertTrue(result2.hasNoEvents()); + assertEquals(new ShoppingCart.ItemUpdated("foo", 42), result.event()); } @Test @@ -88,9 +67,9 @@ public void removeItem() { ShoppingCart.State, StatusReply> result2 = - eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.RemoveItem("foo", replyTo)); + eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.RemoveItem("foo", 42, replyTo)); assertTrue(result2.reply().isSuccess()); - assertEquals(new ShoppingCart.ItemRemoved(CART_ID, "foo", 42), result2.event()); + assertEquals(new ShoppingCart.ItemUpdated("foo", -42), result2.event()); } @Test @@ -110,10 +89,10 @@ public void adjustQuantity() { StatusReply> result2 = eventSourcedTestKit.runCommand( - replyTo -> new ShoppingCart.AdjustItemQuantity("foo", 43, replyTo)); + replyTo -> new ShoppingCart.AddItem("foo", 1, replyTo)); assertTrue(result2.reply().isSuccess()); assertEquals(43, result2.reply().getValue().items.get("foo").intValue()); - assertEquals(new ShoppingCart.ItemQuantityAdjusted(CART_ID, "foo", 42, 43), result2.event()); + assertEquals(new ShoppingCart.ItemUpdated("foo", 1), result2.event()); } @Test @@ -134,7 +113,6 @@ public void checkout() { result2 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.Checkout(replyTo)); assertTrue(result2.reply().isSuccess()); assertTrue(result2.event() instanceof ShoppingCart.CheckedOut); - assertEquals(CART_ID, result2.event().cartId); CommandResultWithReply< ShoppingCart.Command, diff --git a/samples/grpc/shopping-cart-service-scala/README.md b/samples/grpc/shopping-cart-service-scala/README.md index e749ef4b1..fb289747b 100644 --- a/samples/grpc/shopping-cart-service-scala/README.md +++ b/samples/grpc/shopping-cart-service-scala/README.md @@ -44,7 +44,7 @@ grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart # update quantity of item - grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.UpdateItem + grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem # check out cart grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout diff --git a/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartEvents.proto b/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartEvents.proto index fc721c46f..ce3f86873 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartEvents.proto +++ b/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartEvents.proto @@ -7,23 +7,12 @@ package shoppingcart; // Events published to external services -message ItemAdded { - string cartId = 1; - string itemId = 2; - int32 quantity = 3; -} - message ItemQuantityAdjusted { string cartId = 1; string itemId = 2; int32 quantity = 3; } -message ItemRemoved { - string cartId = 1; - string itemId = 2; -} - message CheckedOut { string cartId = 1; } diff --git a/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto b/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto index fbf4858ec..ab784101b 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto +++ b/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto @@ -9,7 +9,7 @@ package shoppingcart; service ShoppingCartService { rpc AddItem (AddItemRequest) returns (Cart) {} - rpc UpdateItem (UpdateItemRequest) returns (Cart) {} + rpc RemoveItem (RemoveItemRequest) returns (Cart) {} rpc Checkout (CheckoutRequest) returns (Cart) {} rpc GetCart (GetCartRequest) returns (Cart) {} } @@ -21,7 +21,7 @@ message AddItemRequest { int32 quantity = 3; } -message UpdateItemRequest { +message RemoveItemRequest { string cartId = 1; string itemId = 2; int32 quantity = 3; diff --git a/samples/grpc/shopping-cart-service-scala/src/main/resources/grpc.conf b/samples/grpc/shopping-cart-service-scala/src/main/resources/grpc.conf index fc3f8b88c..62b45f386 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/resources/grpc.conf +++ b/samples/grpc/shopping-cart-service-scala/src/main/resources/grpc.conf @@ -1,5 +1,5 @@ // #http2 -akka.http.server.preview.enable-http2 = on +akka.http.server.enable-http2 = on // #http2 shopping-cart-service { diff --git a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala index b29ebf910..11204c969 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala +++ b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala @@ -4,6 +4,9 @@ package shopping.cart import akka.actor.typed.ActorSystem import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse +import akka.persistence.query.typed +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.typed.PersistenceId import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.producer.scaladsl.EventProducer import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation @@ -15,16 +18,10 @@ object PublishEvents { def eventProducerService(system: ActorSystem[_]) : PartialFunction[HttpRequest, Future[HttpResponse]] = { val transformation = Transformation.empty - .registerMapper[ShoppingCart.ItemAdded, proto.ItemAdded](event => - Some(transformItemAdded(event))) - .registerMapper[ - ShoppingCart.ItemQuantityAdjusted, - proto.ItemQuantityAdjusted](event => - Some(transformItemQuantityAdjusted(event))) - .registerMapper[ShoppingCart.ItemRemoved, proto.ItemRemoved](event => - Some(transformItemRemoved(event))) - .registerMapper[ShoppingCart.CheckedOut, proto.CheckedOut](event => - Some(transformCheckedOut(event))) + .registerAsyncEnvelopeMapper[ShoppingCart.ItemUpdated, proto.ItemQuantityAdjusted](envelope => + Future.successful(Some(transformItemUpdated(envelope)))) + .registerAsyncEnvelopeMapper[ShoppingCart.CheckedOut, proto.CheckedOut](envelope => + Future.successful(Some(transformCheckedOut(envelope)))) //#withProducerFilter val eventProducerSource = EventProducer @@ -46,27 +43,19 @@ object PublishEvents { } //#eventProducerService - //#transformItemAdded - private def transformItemAdded( - added: ShoppingCart.ItemAdded): proto.ItemAdded = - proto.ItemAdded( - cartId = added.cartId, - itemId = added.itemId, - quantity = added.quantity) - //#transformItemAdded - - def transformItemQuantityAdjusted( - event: ShoppingCart.ItemQuantityAdjusted): proto.ItemQuantityAdjusted = + //#transformItemUpdated + def transformItemUpdated( + envelope: EventEnvelope[ShoppingCart.ItemUpdated]): proto.ItemQuantityAdjusted = { + val event = envelope.event proto.ItemQuantityAdjusted( - cartId = event.cartId, + cartId = PersistenceId.extractEntityId(envelope.persistenceId), itemId = event.itemId, - quantity = event.newQuantity) - - def transformItemRemoved(event: ShoppingCart.ItemRemoved): proto.ItemRemoved = - proto.ItemRemoved(cartId = event.cartId, itemId = event.itemId) + quantity = event.quantity) + } + //#transformItemUpdated - def transformCheckedOut(event: ShoppingCart.CheckedOut): proto.CheckedOut = - proto.CheckedOut(event.cartId) + def transformCheckedOut(envelope: typed.EventEnvelope[ShoppingCart.CheckedOut]): proto.CheckedOut = + proto.CheckedOut(PersistenceId.extractEntityId(envelope.persistenceId)) //#eventProducerService } diff --git a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala index 34627368e..cfc10da50 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala +++ b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala @@ -44,9 +44,7 @@ object ShoppingCart { //#tags final case class State( items: Map[String, Int], - checkoutDate: Option[Instant], - customerId: String, - customerCategory: String) + checkoutDate: Option[Instant]) extends CborSerializable { //#tags @@ -54,34 +52,28 @@ object ShoppingCart { def isCheckedOut: Boolean = checkoutDate.isDefined - def hasItem(itemId: String): Boolean = - items.contains(itemId) - def isEmpty: Boolean = items.isEmpty def updateItem(itemId: String, quantity: Int): State = { - quantity match { - case 0 => copy(items = items - itemId) - case _ => copy(items = items + (itemId -> quantity)) - } + val newQuantity = items.getOrElse(itemId, 0) + quantity + if (newQuantity > 0) + copy(items = items + (itemId -> newQuantity)) + else + copy(items = items.removed(itemId)) } - def removeItem(itemId: String): State = - copy(items = items - itemId) - def checkout(now: Instant): State = copy(checkoutDate = Some(now)) - def setCustomer(customerId: String, category: String): State = - copy(customerId = customerId, customerCategory = category) - - def toSummary: Summary = - Summary(items, isCheckedOut, customerId, customerCategory) + def toSummary: Summary = { + // filter out removed items + Summary(items, isCheckedOut) + } //#tags def totalQuantity: Int = - items.valuesIterator.sum + items.map { case (_, quantity) => quantity }.sum def tags: Set[String] = { val total = totalQuantity @@ -100,9 +92,7 @@ object ShoppingCart { val empty: State = State( items = Map.empty, - checkoutDate = None, - customerId = "", - customerCategory = "") + checkoutDate = None) } //#commands @@ -127,14 +117,6 @@ object ShoppingCart { * A command to remove an item from the cart. */ final case class RemoveItem( - itemId: String, - replyTo: ActorRef[StatusReply[Summary]]) - extends Command - - /** - * A command to adjust the quantity of an item in the cart. - */ - final case class AdjustItemQuantity( itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) @@ -156,46 +138,19 @@ object ShoppingCart { */ final case class Summary( items: Map[String, Int], - checkedOut: Boolean, - customerId: String, - customerCategory: String) + checkedOut: Boolean) extends CborSerializable - - final case class SetCustomer( - customerId: String, - category: String, - replyTo: ActorRef[StatusReply[Summary]]) - extends Command //#commands //#events /** * This interface defines all the events that the ShoppingCart supports. */ - sealed trait Event extends CborSerializable { - def cartId: String - } + sealed trait Event extends CborSerializable - final case class ItemAdded(cartId: String, itemId: String, quantity: Int) - extends Event + final case class ItemUpdated(itemId: String, quantity: Int) extends Event - final case class ItemRemoved(cartId: String, itemId: String, oldQuantity: Int) - extends Event - - final case class ItemQuantityAdjusted( - cartId: String, - itemId: String, - newQuantity: Int, - oldQuantity: Int) - extends Event - - final case class CheckedOut(cartId: String, eventTime: Instant) extends Event - - final case class CustomerDefined( - cartId: String, - customerId: String, - category: String) - extends Event + final case class CheckedOut(eventTime: Instant) extends Event //#events val EntityKey: EntityTypeKey[Command] = @@ -223,7 +178,7 @@ object ShoppingCart { persistenceId = PersistenceId(EntityKey.name, cartId), emptyState = State.empty, commandHandler = - (state, command) => handleCommand(cartId, state, command), + (state, command) => handleCommand(state, command), eventHandler = (state, event) => handleEvent(state, event)) .withTaggerForState { case (state, _) => state.tags @@ -235,66 +190,42 @@ object ShoppingCart { //#tags //#init private def handleCommand( - cartId: String, state: State, command: Command): ReplyEffect[Event, State] = { // The shopping cart behavior changes if it's checked out or not. // The commands are handled differently for each case. if (state.isCheckedOut) - checkedOutShoppingCart(cartId, state, command) + checkedOutShoppingCart(state, command) else - openShoppingCart(cartId, state, command) + openShoppingCart(state, command) } //#commandHandler private def openShoppingCart( - cartId: String, state: State, command: Command): ReplyEffect[Event, State] = { command match { case AddItem(itemId, quantity, replyTo) => - if (state.hasItem(itemId)) - Effect.reply(replyTo)( - StatusReply.Error( - s"Item '$itemId' was already added to this shopping cart")) - else if (quantity <= 0) + if (quantity <= 0) Effect.reply(replyTo)( StatusReply.Error("Quantity must be greater than zero")) else Effect - .persist(ItemAdded(cartId, itemId, quantity)) + .persist(ItemUpdated(itemId, quantity)) .thenReply(replyTo) { updatedCart => StatusReply.Success(updatedCart.toSummary) } - case RemoveItem(itemId, replyTo) => - if (state.hasItem(itemId)) - Effect - .persist(ItemRemoved(cartId, itemId, state.items(itemId))) - .thenReply(replyTo)(updatedCart => - StatusReply.Success(updatedCart.toSummary)) - else - Effect.reply(replyTo)( - StatusReply.Success(state.toSummary) - ) // removing an item is idempotent - - case AdjustItemQuantity(itemId, quantity, replyTo) => + case RemoveItem(itemId, quantity, replyTo) => if (quantity <= 0) Effect.reply(replyTo)( StatusReply.Error("Quantity must be greater than zero")) - else if (state.hasItem(itemId)) + else Effect - .persist( - ItemQuantityAdjusted( - cartId, - itemId, - quantity, - state.items(itemId))) + .persist(ItemUpdated(itemId, -quantity)) .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary)) - else - Effect.reply(replyTo)(StatusReply.Error( - s"Cannot adjust quantity for item '$itemId'. Item not present on cart")) + case Checkout(replyTo) => if (state.isEmpty) @@ -302,34 +233,22 @@ object ShoppingCart { StatusReply.Error("Cannot checkout an empty shopping cart")) else Effect - .persist(CheckedOut(cartId, Instant.now())) + .persist(CheckedOut(Instant.now())) .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary)) case Get(replyTo) => Effect.reply(replyTo)(state.toSummary) - - case SetCustomer(customerId, category, replyTo) => - Effect - .persist(CustomerDefined(cartId, customerId, category)) - .thenReply(replyTo)(updatedCart => - StatusReply.Success(updatedCart.toSummary)) } } //#commandHandler private def checkedOutShoppingCart( - cartId: String, state: State, command: Command): ReplyEffect[Event, State] = { command match { case Get(replyTo) => Effect.reply(replyTo)(state.toSummary) - case SetCustomer(customerId, category, replyTo) => - Effect - .persist(CustomerDefined(cartId, customerId, category)) - .thenReply(replyTo)(updatedCart => - StatusReply.Success(updatedCart.toSummary)) case cmd: AddItem => Effect.reply(cmd.replyTo)( StatusReply.Error( @@ -338,10 +257,6 @@ object ShoppingCart { Effect.reply(cmd.replyTo)( StatusReply.Error( "Can't remove an item from an already checked out shopping cart")) - case cmd: AdjustItemQuantity => - Effect.reply(cmd.replyTo)( - StatusReply.Error( - "Can't adjust item on an already checked out shopping cart")) case cmd: Checkout => Effect.reply(cmd.replyTo)( StatusReply.Error("Can't checkout already checked out shopping cart")) @@ -351,16 +266,10 @@ object ShoppingCart { //#eventHandler private def handleEvent(state: State, event: Event): State = { event match { - case ItemAdded(_, itemId, quantity) => - state.updateItem(itemId, quantity) - case ItemRemoved(_, itemId, _) => - state.removeItem(itemId) - case ItemQuantityAdjusted(_, itemId, quantity, _) => + case ItemUpdated(itemId, quantity) => state.updateItem(itemId, quantity) - case CheckedOut(_, eventTime) => + case CheckedOut(eventTime) => state.checkout(eventTime) - case CustomerDefined(_, customerId, category) => - state.setCustomer(customerId, category) } } //#eventHandler diff --git a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala index bc26b5362..e23e4e3a6 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala +++ b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala @@ -12,9 +12,6 @@ import io.grpc.Status import org.slf4j.LoggerFactory // tag::moreOperations[] -import akka.actor.typed.ActorRef -import akka.pattern.StatusReply - class ShoppingCartServiceImpl(system: ActorSystem[_]) extends proto.ShoppingCartService { @@ -37,18 +34,12 @@ class ShoppingCartServiceImpl(system: ActorSystem[_]) convertError(response) } - override def updateItem(in: proto.UpdateItemRequest): Future[proto.Cart] = { + override def removeItem(in: proto.RemoveItemRequest): Future[proto.Cart] = { logger.info("updateItem {} to cart {}", in.itemId, in.cartId) val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, in.cartId) - def command(replyTo: ActorRef[StatusReply[ShoppingCart.Summary]]) = - if (in.quantity == 0) - ShoppingCart.RemoveItem(in.itemId, replyTo) - else - ShoppingCart.AdjustItemQuantity(in.itemId, in.quantity, replyTo) - val reply: Future[ShoppingCart.Summary] = - entityRef.askWithStatus(command(_)) + entityRef.askWithStatus( ShoppingCart.RemoveItem(in.itemId, in.quantity, _)) val response = reply.map(cart => toProtoCart(cart)) convertError(response) } diff --git a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/CreateTableTestUtils.scala b/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/CreateTableTestUtils.scala deleted file mode 100644 index f9a8463ae..000000000 --- a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/CreateTableTestUtils.scala +++ /dev/null @@ -1,61 +0,0 @@ -package shopping.cart - -import java.nio.file.Paths - -import scala.concurrent.Await -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration._ - -import akka.Done -import akka.actor.typed.ActorSystem -import akka.persistence.jdbc.testkit.scaladsl.SchemaUtils -import akka.projection.jdbc.scaladsl.JdbcProjection -import org.slf4j.LoggerFactory -import shopping.cart.repository.ScalikeJdbcSession - -object CreateTableTestUtils { - - private val createUserTablesFile = - Paths.get("ddl-scripts/create_user_tables.sql").toFile - - def dropAndRecreateTables(system: ActorSystem[_]): Unit = { - implicit val sys: ActorSystem[_] = system - implicit val ec: ExecutionContext = system.executionContext - - // ok to block here, main thread - Await.result( - for { - _ <- SchemaUtils.dropIfExists() - _ <- SchemaUtils.createIfNotExists() - _ <- JdbcProjection.dropOffsetTableIfExists(() => - new ScalikeJdbcSession()) - _ <- JdbcProjection.createOffsetTableIfNotExists(() => - new ScalikeJdbcSession()) - } yield Done, - 30.seconds) - if (createUserTablesFile.exists()) { - Await.result( - for { - _ <- dropUserTables() - _ <- SchemaUtils.applyScript(createUserTablesSql) - } yield Done, - 30.seconds) - } - LoggerFactory - .getLogger("shopping.cart.CreateTableTestUtils") - .info("Created tables") - } - - private def dropUserTables()( - implicit system: ActorSystem[_]): Future[Done] = { - SchemaUtils.applyScript("DROP TABLE IF EXISTS public.item_popularity;") - } - - private def createUserTablesSql: String = { - val source = scala.io.Source.fromFile(createUserTablesFile) - val contents = source.mkString - source.close() - contents - } -} diff --git a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/IntegrationSpec.scala b/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/IntegrationSpec.scala deleted file mode 100644 index 9cd7dee25..000000000 --- a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/IntegrationSpec.scala +++ /dev/null @@ -1,285 +0,0 @@ -package shopping.cart - -import java.util.UUID - -import scala.concurrent.Await -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration._ - -import akka.actor.CoordinatedShutdown -import akka.actor.testkit.typed.scaladsl.ActorTestKit -import akka.actor.typed.ActorSystem -import akka.cluster.MemberStatus -import akka.cluster.typed.Cluster -import akka.grpc.GrpcClientSettings -import akka.kafka.ConsumerSettings -import akka.kafka.Subscriptions -import akka.kafka.scaladsl.Consumer -import akka.persistence.testkit.scaladsl.PersistenceInit -import akka.testkit.SocketUtil -import com.google.protobuf.any.{ Any => ScalaPBAny } -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.serialization.StringDeserializer -import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.PatienceConfiguration -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.should.Matchers -import org.scalatest.time.Span -import org.scalatest.wordspec.AnyWordSpec -import org.slf4j.LoggerFactory -import shopping.cart.repository.ScalikeJdbcSetup -import shopping.order.proto.OrderRequest -import shopping.order.proto.OrderResponse -import shopping.order.proto.ShoppingOrderService - -object IntegrationSpec { - val sharedConfig: Config = ConfigFactory.load("integration-test.conf") - - private def nodeConfig( - grpcPort: Int, - managementPorts: Seq[Int], - managementPortIndex: Int): Config = - ConfigFactory.parseString(s""" - shopping-cart-service.grpc { - interface = "localhost" - port = $grpcPort - } - akka.management.http.port = ${managementPorts(managementPortIndex)} - akka.discovery.config.services { - "shopping-cart-service" { - endpoints = [ - {host = "127.0.0.1", port = ${managementPorts(0)}}, - {host = "127.0.0.1", port = ${managementPorts(1)}}, - {host = "127.0.0.1", port = ${managementPorts(2)}} - ] - } - } - """) - - class TestNodeFixture( - grpcPort: Int, - managementPorts: Seq[Int], - managementPortIndex: Int) { - val testKit = - ActorTestKit( - "IntegrationSpec", - nodeConfig(grpcPort, managementPorts, managementPortIndex) - .withFallback(sharedConfig) - .resolve()) - - def system: ActorSystem[_] = testKit.system - - private val clientSettings = - GrpcClientSettings - .connectToServiceAt("127.0.0.1", grpcPort)(testKit.system) - .withTls(false) - lazy val client: proto.ShoppingCartServiceClient = - proto.ShoppingCartServiceClient(clientSettings)(testKit.system) - CoordinatedShutdown - .get(system) - .addTask( - CoordinatedShutdown.PhaseBeforeServiceUnbind, - "close-test-client-for-grpc")(() => client.close()); - - } -} - -class IntegrationSpec - extends AnyWordSpec - with Matchers - with BeforeAndAfterAll - with ScalaFutures - with Eventually { - import IntegrationSpec.TestNodeFixture - - private val logger = - LoggerFactory.getLogger(classOf[IntegrationSpec]) - - implicit private val patience: PatienceConfig = - PatienceConfig(10.seconds, Span(100, org.scalatest.time.Millis)) - - private val (grpcPorts, managementPorts) = - SocketUtil - .temporaryServerAddresses(6, "127.0.0.1") - .map(_.getPort) - .splitAt(3) - - // one TestKit (ActorSystem) per cluster node - private val testNode1 = - new TestNodeFixture(grpcPorts(0), managementPorts, 0) - private val testNode2 = - new TestNodeFixture(grpcPorts(1), managementPorts, 1) - private val testNode3 = - new TestNodeFixture(grpcPorts(2), managementPorts, 2) - - private val systems3 = - List(testNode1, testNode2, testNode3).map(_.testKit.system) - - private val kafkaTopicProbe = - testNode1.testKit.createTestProbe[Any]() - - // stub of the ShoppingOrderService - private val orderServiceProbe = - testNode1.testKit.createTestProbe[OrderRequest]() - private val testOrderService: ShoppingOrderService = - new ShoppingOrderService { - override def order(in: OrderRequest): Future[OrderResponse] = { - orderServiceProbe.ref ! in - Future.successful(OrderResponse(ok = true)) - } - } - - override protected def beforeAll(): Unit = { - super.beforeAll() - ScalikeJdbcSetup.init(testNode1.system) - CreateTableTestUtils.dropAndRecreateTables(testNode1.system) - // avoid concurrent creation of tables - val timeout = 10.seconds - Await.result( - PersistenceInit.initializeDefaultPlugins(testNode1.system, timeout), - timeout) - } - - private def initializeKafkaTopicProbe(): Unit = { - implicit val sys: ActorSystem[_] = testNode1.system - implicit val ec: ExecutionContext = sys.executionContext - val topic = - sys.settings.config.getString("shopping-cart-service.kafka.topic") - val groupId = UUID.randomUUID().toString - val consumerSettings = - ConsumerSettings(sys, new StringDeserializer, new ByteArrayDeserializer) - .withBootstrapServers("localhost:9092") // provided by Docker compose - .withGroupId(groupId) - Consumer - .plainSource(consumerSettings, Subscriptions.topics(topic)) - .map { record => - val bytes = record.value() - val x = ScalaPBAny.parseFrom(bytes) - val typeUrl = x.typeUrl - val inputBytes = x.value.newCodedInput() - val event: AnyRef = - typeUrl match { - case "shopping-cart-service/shoppingcart.ItemAdded" => - proto.ItemAdded.parseFrom(inputBytes) - case "shopping-cart-service/shoppingcart.ItemQuantityAdjusted" => - proto.ItemQuantityAdjusted.parseFrom(inputBytes) - case "shopping-cart-service/shoppingcart.ItemRemoved" => - proto.ItemRemoved.parseFrom(inputBytes) - case "shopping-cart-service/shoppingcart.CheckedOut" => - proto.CheckedOut.parseFrom(inputBytes) - case _ => - throw new IllegalArgumentException( - s"unknown record type [$typeUrl]") - } - event - } - .runForeach(kafkaTopicProbe.ref.tell) - .failed - .foreach { case e: Exception => - logger.error(s"Test consumer of $topic failed", e) - } - } - - override protected def afterAll(): Unit = { - super.afterAll() - testNode3.testKit.shutdownTestKit() - testNode2.testKit.shutdownTestKit() - // testNode1 must be the last to shutdown - // because responsible to close ScalikeJdbc connections - testNode1.testKit.shutdownTestKit() - } - - "Shopping Cart service" should { - "init and join Cluster" in { - Main.init(testNode1.testKit.system, testOrderService) - Main.init(testNode2.testKit.system, testOrderService) - Main.init(testNode3.testKit.system, testOrderService) - - // let the nodes join and become Up - eventually(PatienceConfiguration.Timeout(15.seconds)) { - systems3.foreach { sys => - Cluster(sys).selfMember.status should ===(MemberStatus.Up) - Cluster(sys).state.members.unsorted.map(_.status) should ===( - Set(MemberStatus.Up)) - } - } - - initializeKafkaTopicProbe() - } - - "update and project from different nodes via gRPC" in { - // add from client1 - val response1 = testNode1.client.addItem( - proto.AddItemRequest(cartId = "cart-1", itemId = "foo", quantity = 42)) - val updatedCart1 = response1.futureValue - updatedCart1.items.head.itemId should ===("foo") - updatedCart1.items.head.quantity should ===(42) - - // first may take longer time - val published1 = - kafkaTopicProbe.expectMessageType[proto.ItemAdded](20.seconds) - published1.cartId should ===("cart-1") - published1.itemId should ===("foo") - published1.quantity should ===(42) - - // add from client2 - val response2 = testNode2.client.addItem( - proto.AddItemRequest(cartId = "cart-2", itemId = "bar", quantity = 17)) - val updatedCart2 = response2.futureValue - updatedCart2.items.head.itemId should ===("bar") - updatedCart2.items.head.quantity should ===(17) - - // update from client2 - val response3 = - testNode2.client.updateItem(proto - .UpdateItemRequest(cartId = "cart-2", itemId = "bar", quantity = 18)) - val updatedCart3 = response3.futureValue - updatedCart3.items.head.itemId should ===("bar") - updatedCart3.items.head.quantity should ===(18) - - // ItemPopularityProjection has consumed the events and updated db - eventually { - testNode1.client - .getItemPopularity(proto.GetItemPopularityRequest(itemId = "foo")) - .futureValue - .popularityCount should ===(42) - - testNode1.client - .getItemPopularity(proto.GetItemPopularityRequest(itemId = "bar")) - .futureValue - .popularityCount should ===(18) - } - - val published2 = - kafkaTopicProbe.expectMessageType[proto.ItemAdded] - published2.cartId should ===("cart-2") - published2.itemId should ===("bar") - published2.quantity should ===(17) - - val published3 = - kafkaTopicProbe.expectMessageType[proto.ItemQuantityAdjusted] - published3.cartId should ===("cart-2") - published3.itemId should ===("bar") - published3.quantity should ===(18) - - val response4 = - testNode2.client.checkout(proto.CheckoutRequest(cartId = "cart-2")) - response4.futureValue.checkedOut should ===(true) - - val orderRequest = - orderServiceProbe.expectMessageType[OrderRequest] - orderRequest.cartId should ===("cart-2") - orderRequest.items.head.itemId should ===("bar") - orderRequest.items.head.quantity should ===(18) - - val published4 = - kafkaTopicProbe.expectMessageType[proto.CheckedOut] - published4.cartId should ===("cart-2") - } - - } -} diff --git a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ItemPopularityIntegrationSpec.scala b/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ItemPopularityIntegrationSpec.scala deleted file mode 100644 index a11ce0294..000000000 --- a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ItemPopularityIntegrationSpec.scala +++ /dev/null @@ -1,102 +0,0 @@ -package shopping.cart - -import scala.concurrent.Await -import scala.concurrent.Future -import scala.concurrent.duration._ - -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.cluster.MemberStatus -import akka.cluster.sharding.typed.scaladsl.ClusterSharding -import akka.cluster.typed.Cluster -import akka.cluster.typed.Join -import akka.persistence.testkit.scaladsl.PersistenceInit -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import org.scalatest.OptionValues -import org.scalatest.wordspec.AnyWordSpecLike -import shopping.cart.repository.ItemPopularityRepositoryImpl -import shopping.cart.repository.ScalikeJdbcSetup -import shopping.cart.repository.ScalikeJdbcSession - -object ItemPopularityIntegrationSpec { - val config: Config = - ConfigFactory.load("item-popularity-integration-test.conf") -} - -class ItemPopularityIntegrationSpec - extends ScalaTestWithActorTestKit(ItemPopularityIntegrationSpec.config) - with AnyWordSpecLike - with OptionValues { - - private lazy val itemPopularityRepository = - new ItemPopularityRepositoryImpl() - - override protected def beforeAll(): Unit = { - ScalikeJdbcSetup.init(system) - CreateTableTestUtils.dropAndRecreateTables(system) - // avoid concurrent creation of keyspace and tables - val timeout = 10.seconds - Await.result( - PersistenceInit.initializeDefaultPlugins(system, timeout), - timeout) - - ShoppingCart.init(system) - - ItemPopularityProjection.init(system, itemPopularityRepository) - - super.beforeAll() - } - - override protected def afterAll(): Unit = { - super.afterAll() - } - - "Item popularity projection" should { - "init and join Cluster" in { - Cluster(system).manager ! Join(Cluster(system).selfMember.address) - - // let the node join and become Up - eventually { - Cluster(system).selfMember.status should ===(MemberStatus.Up) - } - } - - "consume cart events and update popularity count" in { - val sharding = ClusterSharding(system) - val cartId1 = "cart1" - val cartId2 = "cart2" - val item1 = "item1" - val item2 = "item2" - - val cart1 = sharding.entityRefFor(ShoppingCart.EntityKey, cartId1) - val cart2 = sharding.entityRefFor(ShoppingCart.EntityKey, cartId2) - - val reply1: Future[ShoppingCart.Summary] = - cart1.askWithStatus(ShoppingCart.AddItem(item1, 3, _)) - reply1.futureValue.items.values.sum should ===(3) - - eventually { - ScalikeJdbcSession.withSession { session => - itemPopularityRepository.getItem(session, item1).value should ===(3) - } - } - - val reply2: Future[ShoppingCart.Summary] = - cart1.askWithStatus(ShoppingCart.AddItem(item2, 5, _)) - reply2.futureValue.items.values.sum should ===(3 + 5) - // another cart - val reply3: Future[ShoppingCart.Summary] = - cart2.askWithStatus(ShoppingCart.AddItem(item2, 4, _)) - reply3.futureValue.items.values.sum should ===(4) - - eventually { - ScalikeJdbcSession.withSession { session => - itemPopularityRepository.getItem(session, item2).value should ===( - 5 + 4) - itemPopularityRepository.getItem(session, item1).value should ===(3) - } - } - } - - } -} diff --git a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ItemPopularityProjectionSpec.scala b/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ItemPopularityProjectionSpec.scala deleted file mode 100644 index cb052b342..000000000 --- a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ItemPopularityProjectionSpec.scala +++ /dev/null @@ -1,118 +0,0 @@ -package shopping.cart - -import java.time.Instant -import scala.concurrent.{ ExecutionContext, Future } -import akka.Done -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.persistence.query.Offset -import akka.projection.ProjectionId -import akka.projection.eventsourced.EventEnvelope -import akka.projection.scaladsl.Handler -import akka.projection.testkit.scaladsl.TestProjection -import akka.projection.testkit.scaladsl.TestSourceProvider -import akka.projection.testkit.scaladsl.ProjectionTestKit -import akka.stream.scaladsl.Source -import org.scalatest.wordspec.AnyWordSpecLike -import shopping.cart.repository.{ ItemPopularityRepository, ScalikeJdbcSession } - -object ItemPopularityProjectionSpec { - // stub out the db layer and simulate recording item count updates - class TestItemPopularityRepository extends ItemPopularityRepository { - var counts: Map[String, Long] = Map.empty - - override def update( - session: ScalikeJdbcSession, - itemId: String, - delta: Int): Unit = { - counts = counts + (itemId -> (counts.getOrElse(itemId, 0L) + delta)) - } - - override def getItem( - session: ScalikeJdbcSession, - itemId: String): Option[Long] = - counts.get(itemId) - } -} - -class ItemPopularityProjectionSpec - extends ScalaTestWithActorTestKit - with AnyWordSpecLike { - import ItemPopularityProjectionSpec.TestItemPopularityRepository - - private val projectionTestKit = ProjectionTestKit(system) - - private def createEnvelope( - event: ShoppingCart.Event, - seqNo: Long, - timestamp: Long = 0L) = - EventEnvelope( - Offset.sequence(seqNo), - "persistenceId", - seqNo, - event, - timestamp) - - private def toAsyncHandler(itemHandler: ItemPopularityProjectionHandler)( - implicit - ec: ExecutionContext): Handler[EventEnvelope[ShoppingCart.Event]] = - eventEnvelope => - Future { - itemHandler.process(session = null, eventEnvelope) - Done - } - - "The events from the Shopping Cart" should { - - "update item popularity counts by the projection" in { - - val events = - Source( - List[EventEnvelope[ShoppingCart.Event]]( - createEnvelope( - ShoppingCart.ItemAdded("a7098", "bowling shoes", 1), - 0L), - createEnvelope( - ShoppingCart.ItemQuantityAdjusted("a7098", "bowling shoes", 2, 1), - 1L), - createEnvelope( - ShoppingCart - .CheckedOut("a7098", Instant.parse("2020-01-01T12:00:00.00Z")), - 2L), - createEnvelope( - ShoppingCart.ItemAdded("0d12d", "akka t-shirt", 1), - 3L), - createEnvelope(ShoppingCart.ItemAdded("0d12d", "skis", 1), 4L), - createEnvelope(ShoppingCart.ItemRemoved("0d12d", "skis", 1), 5L), - createEnvelope( - ShoppingCart - .CheckedOut("0d12d", Instant.parse("2020-01-01T12:05:00.00Z")), - 6L))) - - val repository = new TestItemPopularityRepository - val projectionId = - ProjectionId("item-popularity", "carts-0") - val sourceProvider = - TestSourceProvider[Offset, EventEnvelope[ShoppingCart.Event]]( - events, - extractOffset = env => env.offset) - val projection = - TestProjection[Offset, EventEnvelope[ShoppingCart.Event]]( - projectionId, - sourceProvider, - () => - toAsyncHandler( - new ItemPopularityProjectionHandler( - "carts-0", - system, - repository))(system.executionContext)) - - projectionTestKit.run(projection) { - repository.counts shouldBe Map( - "bowling shoes" -> 2, - "akka t-shirt" -> 1, - "skis" -> 0) - } - } - } - -} diff --git a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ShoppingCartSpec.scala b/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ShoppingCartSpec.scala index 09246e57b..0c5a87d9e 100644 --- a/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ShoppingCartSpec.scala +++ b/samples/grpc/shopping-cart-service-scala/src/test/scala/shopping/cart/ShoppingCartSpec.scala @@ -15,6 +15,8 @@ object ShoppingCartSpec { } """) .withFallback(EventSourcedBehaviorTestKit.config) + + def summary(items: Map[String, Int], checkedOut: Boolean) = ShoppingCart.Summary(items, checkedOut) } class ShoppingCartSpec @@ -22,12 +24,13 @@ class ShoppingCartSpec with AnyWordSpecLike with BeforeAndAfterEach { - private val cartId = "testCart" + import ShoppingCartSpec._ + private val eventSourcedTestKit = EventSourcedBehaviorTestKit[ ShoppingCart.Command, ShoppingCart.Event, - ShoppingCart.State](system, ShoppingCart(cartId, "carts-0")) + ShoppingCart.State](system, ShoppingCart("carts-0")) override protected def beforeEach(): Unit = { super.beforeEach() @@ -42,19 +45,8 @@ class ShoppingCartSpec replyTo => ShoppingCart.AddItem("foo", 42, replyTo)) result1.reply should ===( StatusReply.Success( - ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))) - result1.event should ===(ShoppingCart.ItemAdded(cartId, "foo", 42)) - } - - "reject already added item" in { - val result1 = - eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( - ShoppingCart.AddItem("foo", 42, _)) - result1.reply.isSuccess should ===(true) - val result2 = - eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( - ShoppingCart.AddItem("foo", 13, _)) - result2.reply.isError should ===(true) + summary(Map("foo" -> 42), checkedOut = false))) + result1.event should ===(ShoppingCart.ItemUpdated("foo", 42)) } "remove item" in { @@ -64,11 +56,10 @@ class ShoppingCartSpec result1.reply.isSuccess should ===(true) val result2 = eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( - ShoppingCart.RemoveItem("foo", _)) + ShoppingCart.RemoveItem("foo", 42, _)) result2.reply should ===( - StatusReply.Success( - ShoppingCart.Summary(Map.empty, checkedOut = false))) - result2.event should ===(ShoppingCart.ItemRemoved(cartId, "foo", 42)) + StatusReply.Success(summary(Map.empty, checkedOut = false))) + result2.event should ===(ShoppingCart.ItemUpdated("foo", -42)) } "adjust quantity" in { @@ -78,12 +69,12 @@ class ShoppingCartSpec result1.reply.isSuccess should ===(true) val result2 = eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( - ShoppingCart.AdjustItemQuantity("foo", 43, _)) + ShoppingCart.AddItem("foo", 1, _)) result2.reply should ===( StatusReply.Success( ShoppingCart.Summary(Map("foo" -> 43), checkedOut = false))) result2.event should ===( - ShoppingCart.ItemQuantityAdjusted(cartId, "foo", 43, 42)) + ShoppingCart.ItemUpdated("foo", 1)) } // tag::checkout[] @@ -95,10 +86,7 @@ class ShoppingCartSpec val result2 = eventSourcedTestKit .runCommand[StatusReply[ShoppingCart.Summary]](ShoppingCart.Checkout(_)) result2.reply should ===( - StatusReply.Success( - ShoppingCart.Summary(Map("foo" -> 42), checkedOut = true))) - result2.event.asInstanceOf[ShoppingCart.CheckedOut].cartId should ===( - cartId) + StatusReply.Success(summary(Map("foo" -> 42), checkedOut = true))) val result3 = eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( @@ -117,7 +105,7 @@ class ShoppingCartSpec val result2 = eventSourcedTestKit.runCommand[ShoppingCart.Summary]( ShoppingCart.Get(_)) result2.reply should ===( - ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)) + summary(Map("foo" -> 42), checkedOut = false)) } // end::get[] @@ -127,14 +115,14 @@ class ShoppingCartSpec ShoppingCart.AddItem("foo", 42, _)) result1.reply should ===( StatusReply.Success( - ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))) + summary(Map("foo" -> 42), checkedOut = false))) eventSourcedTestKit.restart() val result2 = eventSourcedTestKit.runCommand[ShoppingCart.Summary]( ShoppingCart.Get(_)) result2.reply should ===( - ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)) + summary(Map("foo" -> 42), checkedOut = false)) } } diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java index 46ce58d3d..64b380109 100644 --- a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java +++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java @@ -35,6 +35,8 @@ public static void init(ActorSystem system) { // #single-service-handler Replication replicatedShoppingCart = ShoppingCart.init(system); + // alternatively + // Replication replicatedShoppingCart = ShoppingCart.initWithProducerFilter(system); Function> replicationService = replicatedShoppingCart.createSingleServiceHandler(); // #single-service-handler diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java index 743f9d625..3a5f9ef68 100644 --- a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java +++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java @@ -50,16 +50,27 @@ public final class ShoppingCart static final String MEDIUM_QUANTITY_TAG = "medium"; static final String LARGE_QUANTITY_TAG = "large"; + static final String VIP_CUSTOMER_TAG = "vip"; + /** The current state held by the `EventSourcedBehavior`. */ + //#stateUpdateItem + //#checkoutStep1Event + //#stateVipCustomer static final class State implements CborSerializable { final Map items; final Set closed; private Optional checkedOut; + private boolean vipCustomer = false; + + //#stateUpdateItem + //#checkoutStep1Event + //#stateVipCustomer public State() { this(new HashMap<>(), new HashSet<>(), Optional.empty()); } + public State(Map items, Set closed, Optional checkedOut) { this.items = items; this.closed = closed; @@ -70,15 +81,24 @@ public boolean isClosed() { return !closed.isEmpty(); } + //#stateUpdateItem public State updateItem(String itemId, int quantity) { items.put(itemId, items.getOrDefault(itemId, 0) + quantity); return this; } + //#stateUpdateItem + + public State markCustomerVip() { + vipCustomer = true; + return this; + } + //#checkoutStep1Event public State close(ReplicaId replica) { closed.add(replica); return this; } + //#checkoutStep1Event public State checkout(Instant now) { checkedOut = Optional.of(now); @@ -93,17 +113,23 @@ public int totalQuantity() { return items.values().stream().reduce(0, Integer::sum); } + //#stateVipCustomer public Set tags() { int total = totalQuantity(); - if (total == 0) - return Collections.emptySet(); - else if (total >= 100) - return Collections.singleton(LARGE_QUANTITY_TAG); - else if (total >= 10) - return Collections.singleton(MEDIUM_QUANTITY_TAG); - else - return Collections.singleton(SMALL_QUANTITY_TAG); + Set tags = new HashSet<>(); + if (vipCustomer) { + tags.add(VIP_CUSTOMER_TAG); + } + if (total >= 100) { + tags.add(LARGE_QUANTITY_TAG); + } else if (total >= 10) { + tags.add(MEDIUM_QUANTITY_TAG); + } else { + tags.add(SMALL_QUANTITY_TAG); + } + return tags; } + //#stateVipCustomer } /** This interface defines all the commands (messages) that the ShoppingCart actor supports. */ @@ -170,6 +196,15 @@ public Get(ActorRef replyTo) { } } + public static final class MarkCustomerVip implements Command { + final ActorRef> replyTo; + + @JsonCreator + public MarkCustomerVip(ActorRef> replyTo) { + this.replyTo = replyTo; + } + } + /** Summary of the shopping cart state, used in reply messages. */ public static final class Summary implements CborSerializable { final Map items; @@ -185,6 +220,7 @@ public Summary(Map items, boolean checkedOut) { abstract static class Event implements CborSerializable {} + // #itemUpdatedEvent static final class ItemUpdated extends Event { public final String itemId; public final int quantity; @@ -193,6 +229,7 @@ public ItemUpdated(String itemId, int quantity) { this.itemId = itemId; this.quantity = quantity; } + // #itemUpdatedEvent @Override public boolean equals(Object o) { @@ -213,6 +250,14 @@ public int hashCode() { } } + static final class CustomerMarkedVip extends Event { + final Instant timestamp; + + public CustomerMarkedVip(Instant timestamp) { + this.timestamp = timestamp; + } + } + static final class Closed extends Event { final ReplicaId replica; @@ -289,29 +334,47 @@ public static Replication initWithProducerFilter(ActorSystem system) system); Predicate> producerFilter = envelope -> { - Set tags = envelope.getTags(); - return tags.contains(ShoppingCart.MEDIUM_QUANTITY_TAG) || - tags.contains(ShoppingCart.LARGE_QUANTITY_TAG); + return envelope.getTags().contains(VIP_CUSTOMER_TAG); }; return Replication.grpcReplication(replicationSettings, producerFilter, ShoppingCart::create, system); } + + public static Behavior createWithProducerFilter( + ReplicatedBehaviors replicatedBehaviors) { + return Behaviors.setup( + context -> + replicatedBehaviors.setup( + replicationContext -> new ShoppingCart( + context, + replicationContext, + true // onlyReplicateVip flag + ))); + } // #init-producerFilter private final ActorContext context; private final ReplicationContext replicationContext; private final boolean isLeader; - private ShoppingCart(ActorContext context, ReplicationContext replicationContext) { + private final boolean onlyReplicateVip; + + private ShoppingCart(ActorContext context, ReplicationContext replicationContext, boolean onlyReplicateVip) { super( replicationContext.persistenceId(), SupervisorStrategy.restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1)); this.context = context; this.replicationContext = replicationContext; this.isLeader = isShoppingCartLeader(replicationContext); + this.onlyReplicateVip = onlyReplicateVip; + } + + private ShoppingCart(ActorContext context, ReplicationContext replicationContext) { + this(context, replicationContext, false); } // one replica is responsible for checking out the shopping cart, once all replicas have closed + // #leader private static boolean isShoppingCartLeader(ReplicationContext replicationContext) { List orderedReplicas = replicationContext.getAllReplicas().stream() @@ -320,6 +383,7 @@ private static boolean isShoppingCartLeader(ReplicationContext replicationContex int leaderIndex = Math.abs(replicationContext.entityId().hashCode() % orderedReplicas.size()); return orderedReplicas.get(leaderIndex) == replicationContext.replicaId(); } + // #leader @Override public RetentionCriteria retentionCriteria() { @@ -341,6 +405,7 @@ private CommandHandlerWithReplyBuilderByState open .forState(state -> !state.isClosed()) .onCommand(AddItem.class, this::openOnAddItem) .onCommand(RemoveItem.class, this::openOnRemoveItem) + .onCommand(MarkCustomerVip.class, this::openOnMarkCustomerVip) .onCommand(Checkout.class, this::openOnCheckout); } @@ -356,17 +421,29 @@ private ReplyEffect openOnRemoveItem(State state, RemoveItem cmd) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } + private ReplyEffect openOnMarkCustomerVip(State state, MarkCustomerVip cmd) { + if (!state.vipCustomer) { + return Effect().persist(new CustomerMarkedVip(Instant.now())) + .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); + } else { + return Effect().none().thenReply(cmd.replyTo, cart -> StatusReply.success(cart.toSummary())); + } + } + + //#checkoutStep1 private ReplyEffect openOnCheckout(State state, Checkout cmd) { return Effect() .persist(new Closed(replicationContext.replicaId())) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } + //#checkoutStep1 private CommandHandlerWithReplyBuilderByState closedShoppingCart() { return newCommandHandlerWithReplyBuilder() .forState(State::isClosed) .onCommand(AddItem.class, this::closedOnAddItem) .onCommand(RemoveItem.class, this::closedOnRemoveItem) + .onCommand(MarkCustomerVip.class, this::closedOnMarkCustomerVip) .onCommand(Checkout.class, this::closedOnCheckout) .onCommand(CloseForCheckout.class, this::closedOnCloseForCheckout) .onCommand(CompleteCheckout.class, this::closedOnCompleteCheckout); @@ -386,6 +463,13 @@ private ReplyEffect closedOnRemoveItem(State state, RemoveItem cmd StatusReply.error("Can't remove an item from an already checked out shopping cart")); } + private ReplyEffect closedOnMarkCustomerVip(State state, MarkCustomerVip cmd) { + return Effect() + .reply( + cmd.replyTo, + StatusReply.error("Can't remove an already checked out shopping cart as vip customer")); + } + private ReplyEffect closedOnCheckout(State state, Checkout cmd) { return Effect() .reply(cmd.replyTo, StatusReply.error("Can't checkout already checked out shopping cart")); @@ -406,6 +490,7 @@ private CommandHandlerWithReplyBuilderByState getC .onCommand(Get.class, (state, cmd) -> Effect().reply(cmd.replyTo, state.toSummary())); } + //#checkoutEventTrigger @Override public EventHandler eventHandler() { return newEventHandlerBuilder() @@ -425,14 +510,20 @@ public EventHandler eventHandler() { private void eventTriggers(State state) { if (!replicationContext.recoveryRunning()) { - if (!state.closed.contains(replicationContext.replicaId())) { - context.getSelf().tell(CloseForCheckout.INSTANCE); - } else if (isLeader) { - boolean allClosed = replicationContext.getAllReplicas().equals(state.closed); - if (allClosed) context.getSelf().tell(CompleteCheckout.INSTANCE); + if (onlyReplicateVip && !state.vipCustomer) { + // not replicated, no need to coordinate, we can close it right away + context.getSelf().tell(CompleteCheckout.INSTANCE); + } else { + if (!state.closed.contains(replicationContext.replicaId())) { + context.getSelf().tell(CloseForCheckout.INSTANCE); + } else if (isLeader) { + boolean allClosed = replicationContext.getAllReplicas().equals(state.closed); + if (allClosed) context.getSelf().tell(CompleteCheckout.INSTANCE); + } } } } + //#checkoutEventTrigger @Override public Set tagsFor(State state, Event event) { diff --git a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java index d3dbf50c3..b008a13c0 100644 --- a/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java +++ b/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java @@ -8,13 +8,7 @@ import io.grpc.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import shopping.cart.proto.AddItemRequest; -import shopping.cart.proto.Cart; -import shopping.cart.proto.CheckoutRequest; -import shopping.cart.proto.GetCartRequest; -import shopping.cart.proto.Item; -import shopping.cart.proto.RemoveItemRequest; -import shopping.cart.proto.ShoppingCartService; +import shopping.cart.proto.*; import java.time.Duration; import java.util.List; @@ -91,6 +85,19 @@ public CompletionStage getCart(GetCartRequest in) { return convertError(protoCart); } + @Override + public CompletionStage markCustomerVip(MarkCustomerVipRequest in) { + logger.info( + "markCustomerVip for cart {}", in.getCartId()); + EntityRef entityRef = sharding.entityRefFor(entityKey, in.getCartId()); + CompletionStage reply = + entityRef.askWithStatus( + replyTo -> new ShoppingCart.MarkCustomerVip(replyTo), + timeout); + CompletionStage cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart); + return convertError(cart); + } + private static Cart toProtoCart(ShoppingCart.Summary cart) { List protoItems = cart.items.entrySet().stream() diff --git a/samples/replicated/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto b/samples/replicated/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto index 65eed87bd..74f587b92 100644 --- a/samples/replicated/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto +++ b/samples/replicated/shopping-cart-service-java/src/main/protobuf/ShoppingCartService.proto @@ -12,6 +12,7 @@ service ShoppingCartService { rpc RemoveItem (RemoveItemRequest) returns (Cart) {} rpc Checkout (CheckoutRequest) returns (Cart) {} rpc GetCart (GetCartRequest) returns (Cart) {} + rpc MarkCustomerVip (MarkCustomerVipRequest) returns (Cart) {} } message AddItemRequest { @@ -34,6 +35,10 @@ message GetCartRequest { string cartId = 1; } +message MarkCustomerVipRequest { + string cartId = 1; +} + message Cart { repeated Item items = 1; bool checkedOut = 2; diff --git a/samples/replicated/shopping-cart-service-java/src/main/resources/grpc.conf b/samples/replicated/shopping-cart-service-java/src/main/resources/grpc.conf index 15b34d629..ad5e95d4a 100644 --- a/samples/replicated/shopping-cart-service-java/src/main/resources/grpc.conf +++ b/samples/replicated/shopping-cart-service-java/src/main/resources/grpc.conf @@ -1,4 +1,4 @@ -akka.http.server.preview.enable-http2 = on +akka.http.server.enable-http2 = on shopping-cart-service { diff --git a/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto b/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto index 93aefa3eb..64451596a 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto +++ b/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto @@ -12,6 +12,7 @@ service ShoppingCartService { rpc RemoveItem (RemoveItemRequest) returns (Cart) {} rpc Checkout (CheckoutRequest) returns (Cart) {} rpc GetCart (GetCartRequest) returns (Cart) {} + rpc MarkCustomerVip (MarkCustomerVipRequest) returns (Cart) {} } message AddItemRequest { @@ -34,6 +35,10 @@ message GetCartRequest { string cartId = 1; } +message MarkCustomerVipRequest { + string cartId = 1; +} + message Cart { repeated Item items = 1; bool checkedOut = 2; diff --git a/samples/replicated/shopping-cart-service-scala/src/main/resources/grpc.conf b/samples/replicated/shopping-cart-service-scala/src/main/resources/grpc.conf index 2f360bf20..c2ae530a2 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/resources/grpc.conf +++ b/samples/replicated/shopping-cart-service-scala/src/main/resources/grpc.conf @@ -1,4 +1,4 @@ -akka.http.server.preview.enable-http2 = on +akka.http.server.enable-http2 = on shopping-cart-service { grpc { diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala index af330a087..13607a1ef 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala @@ -29,6 +29,8 @@ object Main { // #single-service-handler val replicatedShoppingCart = ShoppingCart.init(system) + // alternatively + // val replicatedShoppingCart = ShoppingCart.initWithProducerFilter(system) val replicationService = replicatedShoppingCart.createSingleServiceHandler() // #single-service-handler diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala index a34d6a918..90101c2a7 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala @@ -46,17 +46,35 @@ object ShoppingCart { /** * The current state held by the `EventSourcedBehavior`. */ - final case class State(items: Map[String, Int], closed: Set[ReplicaId], checkedOut: Option[Instant]) + //#checkoutStep1Event + //#stateUpdateItem + //#stateVipCustomer + final case class State( + items: Map[String, Int], + closed: Set[ReplicaId], + checkedOut: Option[Instant], + vipCustomer: Boolean) extends CborSerializable { + //#stateUpdateItem + //#checkoutStep1Event + //#stateVipCustomer def isClosed: Boolean = closed.nonEmpty + //#stateUpdateItem def updateItem(itemId: String, quantity: Int): State = copy(items = items + (itemId -> (items.getOrElse(itemId, 0) + quantity))) + //#stateUpdateItem + def markCustomerVip(): State = + if (vipCustomer) this + else copy(vipCustomer = true) + + //#checkoutStep1Event def close(replica: ReplicaId): State = copy(closed = closed + replica) + //#checkoutStep1Event def checkout(now: Instant): State = copy(checkedOut = Some(now)) @@ -71,17 +89,23 @@ object ShoppingCart { def totalQuantity: Int = items.valuesIterator.sum + //#stateVipCustomer + def tags: Set[String] = { val total = totalQuantity - if (total == 0) Set.empty - else if (total >= 100) Set(LargeQuantityTag) - else if (total >= 10) Set(MediumQuantityTag) - else Set(SmallQuantityTag) + val quantityTags = + if (total == 0) Set.empty + else if (total >= 100) Set(LargeQuantityTag) + else if (total >= 10) Set(MediumQuantityTag) + else Set(SmallQuantityTag) + + quantityTags ++ (if (vipCustomer) Set(VipCustomerTag) else Set.empty) } + //#stateVipCustomer } object State { - val empty: State = State(items = Map.empty, closed = Set.empty, checkedOut = None) + val empty: State = State(items = Map.empty, closed = Set.empty, checkedOut = None, vipCustomer = false) } /** @@ -102,6 +126,8 @@ object ShoppingCart { */ final case class RemoveItem(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) extends Command + final case class MarkCustomerVip(replyTo: ActorRef[StatusReply[Summary]]) extends Command + /** * A command to check out the shopping cart. */ @@ -132,7 +158,11 @@ object ShoppingCart { */ sealed trait Event extends CborSerializable + // #itemUpdatedEvent final case class ItemUpdated(itemId: String, quantity: Int) extends Event + // #itemUpdatedEvent + + final case class CustomerMarkedVip(timestamp: Instant) extends Event final case class Closed(replica: ReplicaId) extends Event @@ -141,6 +171,7 @@ object ShoppingCart { val SmallQuantityTag = "small" val MediumQuantityTag = "medium" val LargeQuantityTag = "large" + val VipCustomerTag = "vip" val EntityType = "replicated-shopping-cart" @@ -160,30 +191,43 @@ object ShoppingCart { // #init // Use `initWithProducerFilter` instead of `init` to enable filters based on tags. - // Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter. + // The filter is defined to only replicate carts from VIP customers, other customer carts will stay in + // the replica they were created (but can be marked VIP at any point in time before being closed) // #init-producerFilter def initWithProducerFilter(implicit system: ActorSystem[_]): Replication[Command] = { val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) val producerFilter: EventEnvelope[Event] => Boolean = { envelope => - val tags = envelope.tags - tags.contains(ShoppingCart.MediumQuantityTag) || tags.contains(ShoppingCart.LargeQuantityTag) + envelope.tags.contains(VipCustomerTag) } Replication.grpcReplication(replicationSettings, producerFilter)(ShoppingCart.apply) } + + def applyWithProducerFilter(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = { + Behaviors.setup[Command] { context => + replicatedBehaviors.setup { replicationContext => + new ShoppingCart(context, replicationContext, onlyReplicateVip = true).behavior() + } + } + } // #init-producerFilter } -class ShoppingCart(context: ActorContext[ShoppingCart.Command], replicationContext: ReplicationContext) { +class ShoppingCart( + context: ActorContext[ShoppingCart.Command], + replicationContext: ReplicationContext, + onlyReplicateVip: Boolean = false) { import ShoppingCart._ // one of the replicas is responsible for checking out the shopping cart, once all replicas have closed + // #leader private val isLeader: Boolean = { val orderedReplicas = replicationContext.allReplicas.toSeq.sortBy(_.id) val leaderIndex = math.abs(replicationContext.entityId.hashCode % orderedReplicas.size) orderedReplicas(leaderIndex) == replicationContext.replicaId } + // #leader def behavior(): EventSourcedBehavior[Command, Event, State] = { EventSourcedBehavior @@ -224,12 +268,23 @@ class ShoppingCart(context: ActorContext[ShoppingCart.Command], replicationConte StatusReply.Success(updatedCart.toSummary) } + // #checkoutStep1 case Checkout(replyTo) => Effect .persist(Closed(replicationContext.replicaId)) .thenReply(replyTo) { updatedCart => StatusReply.Success(updatedCart.toSummary) } + // #checkoutStep1 + + case MarkCustomerVip(replyTo) => + if (!state.vipCustomer) + Effect + .persist(CustomerMarkedVip(Instant.now())) + .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary)) + else + // already marked vip + Effect.none.thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary)) case CloseForCheckout => Effect @@ -253,6 +308,8 @@ class ShoppingCart(context: ActorContext[ShoppingCart.Command], replicationConte Effect.reply(cmd.replyTo)(StatusReply.Error("Can't add an item to an already checked out shopping cart")) case cmd: RemoveItem => Effect.reply(cmd.replyTo)(StatusReply.Error("Can't remove an item from an already checked out shopping cart")) + case cmd: MarkCustomerVip => + Effect.reply(cmd.replyTo)(StatusReply.Error("Can't mark customer vip on an already checked out shopping cart")) case cmd: Checkout => Effect.reply(cmd.replyTo)(StatusReply.Error("Can't checkout already checked out shopping cart")) case CloseForCheckout => @@ -267,10 +324,13 @@ class ShoppingCart(context: ActorContext[ShoppingCart.Command], replicationConte } } + //#checkoutEventTrigger private def handleEvent(state: State, event: Event): State = { val newState = event match { case ItemUpdated(itemId, quantity) => state.updateItem(itemId, quantity) + case CustomerMarkedVip(_) => + state.markCustomerVip() case Closed(replica) => state.close(replica) case CheckedOut(eventTime) => @@ -282,16 +342,22 @@ class ShoppingCart(context: ActorContext[ShoppingCart.Command], replicationConte private def eventTriggers(state: State, event: Event): Unit = { if (!replicationContext.recoveryRunning) { - event match { - case _: Closed => - if (!state.closed(replicationContext.replicaId)) { - context.self ! CloseForCheckout - } else if (isLeader) { - val allClosed = replicationContext.allReplicas.diff(state.closed).isEmpty - if (allClosed) context.self ! CompleteCheckout - } - case _ => + if (onlyReplicateVip && !state.vipCustomer) { + // not replicated, no need to coordinate, we can close it right away + context.self ! CompleteCheckout + } else { + event match { + case _: Closed => + if (!state.closed(replicationContext.replicaId)) { + context.self ! CloseForCheckout + } else if (isLeader) { + val allClosed = replicationContext.allReplicas.diff(state.closed).isEmpty + if (allClosed) context.self ! CompleteCheckout + } + case _ => + } } } } + //#checkoutEventTrigger } diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala index cc10cf8e4..056733574 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala @@ -1,19 +1,16 @@ package shopping.cart import java.util.concurrent.TimeoutException - import scala.concurrent.Future - import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.grpc.GrpcServiceException -import akka.persistence.typed.ReplicaId -import akka.projection.grpc.consumer.ConsumerFilter import akka.util.Timeout import io.grpc.Status import org.slf4j.LoggerFactory -import shopping.cart.proto.Empty +import shopping.cart.proto.Cart +import shopping.cart.proto.MarkCustomerVipRequest class ShoppingCartServiceImpl(system: ActorSystem[_], entityKey: EntityTypeKey[ShoppingCart.Command]) extends proto.ShoppingCartService { @@ -67,6 +64,15 @@ class ShoppingCartServiceImpl(system: ActorSystem[_], entityKey: EntityTypeKey[S convertError(response) } + override def markCustomerVip(in: MarkCustomerVipRequest): Future[Cart] = { + logger.info("markCustomerVip {}", in.cartId) + val entityRef = sharding.entityRefFor(entityKey, in.cartId) + val reply: Future[ShoppingCart.Summary] = + entityRef.askWithStatus(ShoppingCart.MarkCustomerVip) + val response = reply.map(cart => toProtoCart(cart)) + convertError(response) + } + private def toProtoCart(cart: ShoppingCart.Summary): proto.Cart = { proto.Cart( cart.items.iterator.map { case (itemId, quantity) =>