From a5fa5181395bce947108e4987dd9f73345ed189d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 12 Dec 2023 17:55:36 +0100 Subject: [PATCH] Guide first iteration done --- .../main/paradox/guide/5-charging-station.md | 252 +++++++++++++++++- .../main/java/charging/ChargingStation.java | 16 +- .../java/local/drones/DroneServiceImpl.java | 3 +- .../main/scala/charging/ChargingStation.scala | 18 ++ .../scala/local/drones/DroneServiceImpl.scala | 3 + .../src/main/java/central/Main.java | 3 + .../src/main/scala/central/Main.scala | 2 + 7 files changed, 285 insertions(+), 12 deletions(-) diff --git a/akka-edge-docs/src/main/paradox/guide/5-charging-station.md b/akka-edge-docs/src/main/paradox/guide/5-charging-station.md index af229dc9b..3e160329e 100644 --- a/akka-edge-docs/src/main/paradox/guide/5-charging-station.md +++ b/akka-edge-docs/src/main/paradox/guide/5-charging-station.md @@ -5,12 +5,13 @@ in the form of a charging station. The charging stations are created with a loca local-drone-control edge services where the entity is replicated. Drones in that location can request to charge in the charging station, and be charged if there is a free charging slot. -## Charging station entity +## Implementing the charging station entity -The API for creating a replicated event sourced behavior extends the regular event sourced behavior. It accepts three -different commands from the outside `Create` to initialize a charging station, `StartCharging` to start a charging session -for a drone if possible and `GetState` to query the station for its current state. There is also a private `ChargingCompleted` -command that the entity can send to itself. +### Commands and events + +The charging station accepts three different commands from the outside `Create` to initialize a charging station, +`StartCharging` to start a charging session for a drone if possible and `GetState` to query the station for its current +state. There is also a private `CompleteCharging` command that only the entity can create. The `Create` command leads to a `Created` event which is persisted and initialized the charging station. @@ -23,10 +24,241 @@ Scala Java : @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #commands #events } +### State + +The state of the charging station starts as @java[`null`]@scala[`None`] and requires a `Create` message for the station +to be initialized with a @java[`State`]@scala[`Some(State)`]. + +The `State` contains the number of charging slots that can concurrently charge drones and a set of currently charging drones. + +The state also contains a location id identifying where it is, matching the location id structure of +the local-drone-control service. This is needed so that the station can be replicated only to the edge location where +it is located. + +Scala +: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #state } + +Java +: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #state } + +### Command handler + +The command handler is in fact two separate handlers, one for when the entity is not yet initialized, only accepting +the `Create` command, and one that is used to handle commands once the station has been initialized. + +The `StartCharging` command is the only one that requires more complicated logic: if a slot is free, persist a `ChargingStarted` +event and tell the drone when charging will be done. A timer is also set, to send a `CompleteCharging` to itself once the +charging time has passed. If all charging slots are busy the reply will instead be when the first slot will +be free again and the drone can come back and try charge again. + +Scala +: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #commandHandler } + +Java +: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #commandHandler } + +### Event handler + +Just like the command handler the event handler is different depending on if there is state or not. If there is no state +only the `Created` event is accepted. + +Once initialized the charging station expects `ChargingStarted` and `ChargingCompleted` events, additional `Created` +events are ignored. + +Scala +: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #eventHandler } + +Java +: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #eventHandler } + +The charging station is a very limited replicated entity example to keep the guide simple. It doesn't expect any possible +conflicts, stations are created, once, in the central cloud and replicated to the edge, updates related to drones currently +charging in the station happen at the edge and are replicated to the cloud. Akka replicated event sourcing provides APIs +for both CRDTs where conflicts are automatically handled by the data structure and business domain level conflict resolution. +For more details about see the @extref[Akka documentation](akka:replicated-eventsourcing.html). + + +### Tagging based on location + +To be able to control where the charging station is replicated we tag the events using the location id from the state as +a topic: + +Scala +: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #tagging } + +Java +: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #tagging } + +### Additional logic on start + +In case the entity is stopped or entire local-drone-control system is shut down, we also need to schedule completion of +current charging sessions when it completes starting up, this is done with a signal handler for the `RecoveryCompleted` signal +which Akka will send to it every time it has completed starting up. Note how we are using the `ReplicationContext` to only +do this on the replica where the charging was started: + +Scala +: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #eventHandler } + +Java +: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #eventHandler } + +### Serialization + +The state and events of the entity must be serializable because they are written to the datastore, if the local drone control needs to scale out across several nodes to handle traffic, the commands would also be sent between nodes within the Akka cluster. The sample project includes built-in CBOR serialization using the @extref[Akka Serialization Jackson module](akka:serialization-jackson.html). This section describes how serialization is implemented. You do not need to do anything specific to take advantage of CBOR, but this section explains how it is included. + +The state, commands and events are marked as `akka.serialization.jackson.CborSerializable` which is configured to use the built-in CBOR serialization. + +### Setting up replication for the charging station + +Setup for the cloud replica and the edge node differs slightly. + +For the restaurant-drone-deliveries service running in the cloud we set up a `ReplicationSettings` with edge replication +enabled: + +Scala +: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #replicaInit } + +Java +: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #replicaInit } + +Since we have other events going over akka-projection-grpc producer push replication already in the restaurant-drone-deliveries service +we need to combine all such sources and destinations into single gRPC services: + +Scala +: @@snip [Main.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala) { #replicationEndpoint } + +Java +: @@snip [Main.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java) { #replicationEndpoint } + +For the local-drone-control service we also create `ReplicationSettings` but pass them to a separate initialization method `Replication.grpcEdgeReplication`. +Since the edge node will be responsible for creating connections, no gRPC services needs to be bound: + +Scala +: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #edgeReplicaInit } + +Java +: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #edgeReplicaInit } + +The returned `EdgeReplication` instance gives us access to a `entityRefFactory` for sending messages to the charging stations. + + +## Service for interacting with the charging station + +In the restaurant-drone-deliveries service we introduce a separate gRPC endpoint for creating and looking at charging +station state: + +Scala +: @@snip [charging_station_api.proto](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/charging/charging_station_api.proto) { } + +Java +: @@snip [charging_station_api.proto](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/charging/charging_station_api.proto) { } + +The service implementation takes the `entityRefFactory` as a constructor parameter and uses that to create `EntityRef` instances +for specific charging stations to interact with them: + +Scala +: @@snip [ChargingStationServiceImpl.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/charging/ChargingStationServiceImpl.scala) { } + +Java +: @@snip [ChargingStationServiceImpl.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/charging/ChargingStationServiceImpl.java) { } + +## Interacting with the charging station at the edge + +The local-drone-control service does not contain the full gRPC API for interaction but instead has a single `goCharge` method +in the drone gRPC service: + +Scala +: @@snip [DroneServiceImpl.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala) { #charging } + +Java +: @@snip [DroneServiceImpl.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java) { #charging } + + +## Running + +The complete sample can be downloaded from GitHub, but note that it also includes the next steps of the guide: + +* Java: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-java +* Scala: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-scala + +As this service consumes events from the service built in the previous step, start the local-drone-control service first: + +@@@ div { .group-scala } + +To start the local-drone-control-service: + +```shell +sbt run +``` + +@@@ + +@@@ div { .group-java } + +```shell +mvn compile exec:exec +``` + +@@@ + +Then start the drone-restaurant-deliveries-service. + +As the service needs a PostgreSQL instance running, start that up in a docker container and create the database +schema if you did not do that in a previous step of the guide: + +```shell +docker compose up --wait +docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql +``` + +Then start the service: + +@@@ div { .group-scala } + +```shell +sbt -Dconfig.resource=local1.conf run +``` + +And optionally one or two more Akka cluster nodes, but note that the local drone controls +are statically configured to the gRPC port of the first and will only publish events to that node. + +```shell +sbt -Dconfig.resource=local2.conf run +sbt -Dconfig.resource=local3.conf run +``` + +@@@ + +@@@ div { .group-java } + +```shell +mvn compile exec:exec -DAPP_CONFIG=local1.conf +``` + +And optionally one or two more Akka cluster nodes, but note that the local drone controls +are statically configured to the gRPC port of the first and will only publish events to that node. + +```shell +mvn compile exec:exec -DAPP_CONFIG=local2.conf +mvn compile exec:exec -DAPP_CONFIG=local3.conf +``` + +@@@ + +Set up a replicated charging station in the cloud service using [grpcurl](https://github.com/fullstorydev/grpcurl): + +```shell +grpcurl -d '{"charging_station_id":"station1","location_id": "sweden/stockholm/kungsholmen", "charging_slots": 4}' -plaintext localhost:8101 charging.ChargingStationService.CreateChargingStation +``` + +Ask to charge the drone with id `drone1` at the charging station in the edge service: + +```shell +grpcurl -d '{"drone_id":"drone1","charging_station_id":"station1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.GoCharge +``` +Inspect the state of the charging station in the cloud service to see the charging drone replicated there: -The charging station is a simplified replicated entity in that it doesn't really expect any possible conflicts, stations -are created in the central cloud and replicated to the edge, updates related to drones currently charging in the station -happen at the edge and are replicated to the cloud. Akka replicated event sourcing provides APIs for both CRDTs where -conflicts are automatically handled by the data structure and business domain level conflict resolution. For more details -about see the @extref[Akka documentation](akka:replicated-eventsourcing.html). \ No newline at end of file +```shell +grpcurl -d '{"charging_station_id":"station1"}' -plaintext localhost:8101 charging.ChargingStationService.GetChargingStationState +``` \ No newline at end of file diff --git a/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java b/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java index 7af6af44c..be5aa393e 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java +++ b/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java @@ -127,6 +127,8 @@ public ChargingDrone(String droneId, Instant chargingDone, String replicaId) { } } // #events + + // #state public static final class State implements CborSerializable { public final int chargingSlots; public final Set dronesCharging; @@ -138,6 +140,7 @@ public State(int chargingSlots, Set dronesCharging, String statio this.stationLocationId = stationLocationId; } } + // #state public static final String ENTITY_TYPE = "charging-station"; @@ -147,6 +150,7 @@ public State(int chargingSlots, Set dronesCharging, String statio * Init for running in edge node, this is the only difference from the ChargingStation in * restaurant-deliveries-service */ + // #edgeReplicaInit public static EdgeReplication initEdge(ActorSystem system, String locationId) { var replicationSettings = ReplicationSettings.create( @@ -159,8 +163,10 @@ public static EdgeReplication initEdge(ActorSystem system, String lo new ConsumerFilter.IncludeTopics(Set.of(locationId)))); return Replication.grpcEdgeReplication(replicationSettings, ChargingStation::create, system); } + // #edgeReplicaInit /** Init for running in cloud replica */ + // #replicaInit public static Replication init(ActorSystem system) { var replicationSettings = ReplicationSettings.create( @@ -169,6 +175,7 @@ public static Replication init(ActorSystem system) { .withEdgeReplication(true); return Replication.grpcReplication(replicationSettings, ChargingStation::create, system); } + // #replicaInit public static Behavior create( ReplicatedBehaviors replicatedBehaviors) { @@ -207,9 +214,9 @@ public State emptyState() { return null; } + // #commandHandler @Override public CommandHandler commandHandler() { - var noStateHandler = newCommandHandlerBuilder() .forNullState() @@ -317,6 +324,7 @@ private Effect handleStartCharging(State state, StartCharging star }); } } + // #commandHandler @Override public EventHandler eventHandler() { @@ -367,6 +375,7 @@ public EventHandler eventHandler() { return noStateHandler.orElse(initializedStateHandler).build(); } + // #recoveryCompleted @Override public SignalHandler signalHandler() { return newSignalHandlerBuilder() @@ -374,13 +383,17 @@ public SignalHandler signalHandler() { RecoveryCompleted.class, (state, recoveryCompleted) -> handleRecoveryCompleted(state)) .build(); } + // #recoveryCompleted + // #tagging @Override public Set tagsFor(State state, Event event) { if (state == null) return Set.of(); else return Set.of("t:" + state.stationLocationId); } + // #tagging + // #recoveryCompleted private void handleRecoveryCompleted(State state) { // FIXME this scheme is not quite reliable, because station is not remembered/restarted // until next new event/command if edge system is shut down/restarted @@ -401,4 +414,5 @@ private void handleRecoveryCompleted(State state) { }); } } + // #recoveryCompleted } diff --git a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java index 12ccfe5c0..0b531641d 100644 --- a/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java +++ b/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java @@ -122,6 +122,7 @@ public CompletionStage completeDelivery(CompleteDelive return convertError(reply); } + // #charging @Override public CompletionStage goCharge(GoChargeRequest in) { logger.info("Requesting charge of {} from {}", in.getDroneId(), in.getChargingStationId()); @@ -159,7 +160,7 @@ public CompletionStage goCharge(GoChargeRequest in) { return convertError(response); } - + // #charging private static Timestamp instantToProtoTimestamp(Instant instant) { return Timestamp.newBuilder() .setSeconds(instant.getEpochSecond()) diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala b/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala index d80626eba..e7e89a759 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala @@ -63,6 +63,7 @@ object ChargingStation { case class ChargingCompleted(droneId: String) extends Event // #events + // #state case class ChargingDrone( droneId: String, chargingDone: Instant, @@ -72,6 +73,7 @@ object ChargingStation { dronesCharging: Set[ChargingDrone], stationLocationId: String) extends CborSerializable + // #state val EntityType = "charging-station" @@ -81,6 +83,7 @@ object ChargingStation { * Init for running in edge node, this is the only difference from the ChargingStation * in restaurant-deliveries-service */ + // #edgeReplicaInit def initEdge(locationId: String)( implicit system: ActorSystem[_]): EdgeReplication[Command] = { val replicationSettings = @@ -93,10 +96,12 @@ object ChargingStation { ConsumerFilter.IncludeTopics(Set(locationId)))) Replication.grpcEdgeReplication(replicationSettings)(ChargingStation.apply) } + // #edgeReplicaInit /** * Init for running in cloud replica */ + // #replicaInit def init(implicit system: ActorSystem[_]): Replication[Command] = { val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) @@ -104,6 +109,7 @@ object ChargingStation { .withEdgeReplication(true) Replication.grpcReplication(replicationSettings)(ChargingStation.apply) } + // #replicaInit def apply( replicatedBehaviors: ReplicatedBehaviors[Command, Event, Option[State]]) @@ -138,15 +144,20 @@ class ChargingStation( None, handleCommand, handleEvent) + // #recoveryCompleted .receiveSignal { case (Some(state: State), RecoveryCompleted) => handleRecoveryCompleted(state) } + // #recoveryCompleted // tag with location id so we can replicate only to the right edge node + // #tagging .withTaggerForState { case (None, _) => Set.empty case (Some(state), _) => Set("t:" + state.stationLocationId) } + // #tagging + // #commandHandler private def handleCommand( state: Option[State], command: Command): Effect[Event, Option[State]] = @@ -231,7 +242,9 @@ class ChargingStation( } } + // #commandHandler + // #eventHandler private def handleEvent(state: Option[State], event: Event): Option[State] = { state match { case None => @@ -262,6 +275,9 @@ class ChargingStation( } } + // #eventHandler + + // #recoveryCompleted private def handleRecoveryCompleted(state: State): Unit = { // FIXME this scheme is not quite reliable, because station is not remembered/restarted @@ -280,5 +296,7 @@ class ChargingStation( durationUntil(chargingDrone.chargingDone)) } } + // #recoveryCompleted + } diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala index 439c87485..3d0f4bb18 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala @@ -87,6 +87,7 @@ class DroneServiceImpl( .map(_ => Empty.defaultInstance) } + // #charging override def goCharge( in: proto.GoChargeRequest): Future[proto.ChargingResponse] = { logger.info( @@ -109,6 +110,8 @@ class DroneServiceImpl( } convertError(response) } + // #charging + private def convertError[T](response: Future[T]): Future[T] = { response.recoverWith { diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java index 063174788..f6997908b 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java @@ -50,6 +50,8 @@ private static void init(ActorSystem system) { var deliveryEventsProducerSource = DeliveryEvents.eventProducerSource(system); var droneOverviewService = new DroneOverviewServiceImpl(system, settings); var restaurantDeliveriesService = new RestaurantDeliveriesServiceImpl(system, settings); + + // #replicationEndpoint var chargingStationService = new ChargingStationServiceImpl( settings, @@ -74,6 +76,7 @@ private static void init(ActorSystem system) { pushedEventsDestination, chargingStationReplication.eventProducerPushDestination().get()), system); + // #replicationEndpoint DroneDeliveriesServer.start( system, diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala index f45a97d84..86301ba90 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala @@ -55,6 +55,7 @@ object Main { val restaurantDeliveriesService = new RestaurantDeliveriesServiceImpl(system, settings) + // #replicationEndpoint val chargingStationService = new ChargingStationServiceImpl( chargingStationReplication.entityRefFactory) @@ -71,6 +72,7 @@ object Main { Set( pushedEventsDestination, chargingStationReplication.eventProducerPushDestination.get))(system) + // #replicationEndpoint DroneDeliveriesServer.start( interface,