From f732ad5b06b54383dc5774f085a63d00ffb77cbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 5 Dec 2023 17:25:20 +0100 Subject: [PATCH] RES and binding added to local drone control scala --- .../main/resources/application-cluster.conf | 1 + .../src/main/resources/application.conf | 1 + .../src/main/resources/grpc.conf | 6 + .../src/main/resources/local-shared.conf | 1 + .../src/main/resources/replication.conf | 20 ++ .../main/scala/charging/ChargingStation.scala | 243 ++++++++++++++++++ .../src/main/scala/local/drones/Main.scala | 4 + 7 files changed, 276 insertions(+) create mode 100644 samples/grpc/local-drone-control-scala/src/main/resources/replication.conf create mode 100644 samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/application-cluster.conf b/samples/grpc/local-drone-control-scala/src/main/resources/application-cluster.conf index 1f28a6ad5..bda9d238d 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/application-cluster.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/application-cluster.conf @@ -3,6 +3,7 @@ include "cluster" include "grpc" include "persistence-postgres" +include "replication" local-drone-control { diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/application.conf b/samples/grpc/local-drone-control-scala/src/main/resources/application.conf index dee889d47..7034edda8 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/application.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/application.conf @@ -4,6 +4,7 @@ include "h2-default-projection-schema.conf" include "grpc" include "persistence-h2" +include "replication" local-drone-control { # unique identifier for the instance of local control, must be known up front by the cloud service diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf b/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf index d399bb5c0..b85d17372 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf @@ -31,3 +31,9 @@ akka.projection.grpc.consumer { } stream-id = "delivery-events" } + +akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } +} diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/local-shared.conf b/samples/grpc/local-drone-control-scala/src/main/resources/local-shared.conf index c505cd51a..0c0fa8979 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/local-shared.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/local-shared.conf @@ -3,6 +3,7 @@ include "cluster" include "grpc" include "persistence-postgres" +include "replication" local-drone-control.grpc.interface = "127.0.0.1" akka.remote.artery.canonical.hostname = "127.0.0.1" diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/replication.conf b/samples/grpc/local-drone-control-scala/src/main/resources/replication.conf new file mode 100644 index 000000000..6cefe04f9 --- /dev/null +++ b/samples/grpc/local-drone-control-scala/src/main/resources/replication.conf @@ -0,0 +1,20 @@ +# Replication configuration for the ChargingStation. Note that config `charging-station` +# is the same as the ChargingStation.EntityType. + +charging-station { + # Set with code to use the location id as replica id, as it is unique for each pop + self-replica-id = "PLACEHOLDER_REPLACED_IN_CODE" + entity-event-replication-timeout = 10s + parallel-updates = 1 + # only list the cloud replica(s) we want to connect to + replicas: [{ + replica-id = "cloud1" + number-of-consumers = 1 + grpc.client { + # same as for producer in grpc.conf, so re-use config from there + host = ${akka.grpc.client.central-drone-control.host} + port = ${akka.grpc.client.central-drone-control.port} + use-tls = off + } + }] +} diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala b/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala new file mode 100644 index 000000000..220486925 --- /dev/null +++ b/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala @@ -0,0 +1,243 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ +package charging + +import akka.Done +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.TimerScheduler +import akka.persistence.typed.RecoveryCompleted +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.ReplicationContext +import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors +import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.Replication.EdgeReplication +import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import akka.projection.r2dbc.scaladsl.R2dbcReplication +import akka.serialization.jackson.CborSerializable + +import java.time.Instant +import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.DurationLong +import scala.concurrent.duration.FiniteDuration +import scala.math.Ordered.orderingToOrdered + +object ChargingStation { + + // commands and replies + sealed trait Command extends CborSerializable {} + case class Create( + locationId: String, + chargingSlots: Int, + replyTo: ActorRef[Done]) + extends Command + case class StartCharging( + droneId: String, + replyTo: ActorRef[StartChargingResponse]) + extends Command + sealed trait StartChargingResponse + case class AllSlotsBusy(firstSlotFreeAt: Instant) + extends StartChargingResponse + + case class GetState(replyTo: ActorRef[ChargingStation.State]) extends Command + + private case class CompleteCharging(droneId: String) extends Command + + // events + sealed trait Event extends CborSerializable {} + case class Created(locationId: String, chargingSlots: Int) extends Event + case class ChargingStarted(droneId: String, chargeComplete: Instant) + extends Event + with StartChargingResponse + + case class ChargingCompleted(droneId: String) extends Event + + case class ChargingDrone( + droneId: String, + chargingDone: Instant, + replicaId: String) + case class State( + chargingSlots: Int, + dronesCharging: Set[ChargingDrone], + stationLocationId: String) + extends CborSerializable + + val EntityType = "charging-station" + + private val FullChargeTime = 5.minutes + + // FIXME This is the only difference from the cloud one, maybe include in both and keep the RES identical? + def initEdge(locationId: String)( + implicit system: ActorSystem[_]): EdgeReplication[Command] = { + val replicationSettings = + ReplicationSettings[Command](EntityType, R2dbcReplication()) + .withSelfReplicaId(ReplicaId(locationId.replace("/", "_"))) + Replication.grpcEdgeReplication(replicationSettings)(ChargingStation.apply) + } + + def apply( + replicatedBehaviors: ReplicatedBehaviors[Command, Event, Option[State]]) + : Behavior[Command] = { + Behaviors.setup[Command] { context => + Behaviors.withTimers { timers => + replicatedBehaviors.setup { replicationContext => + new ChargingStation(context, replicationContext, timers).behavior() + } + } + } + } + + private def durationUntil(instant: Instant): FiniteDuration = + (instant.getEpochSecond - Instant.now().getEpochSecond).seconds + +} + +class ChargingStation( + context: ActorContext[ChargingStation.Command], + replicationContext: ReplicationContext, + timers: TimerScheduler[ChargingStation.Command]) { + + import ChargingStation._ + + def behavior(): EventSourcedBehavior[Command, Event, Option[State]] = + EventSourcedBehavior( + replicationContext.persistenceId, + None, + handleCommand, + handleEvent) + .receiveSignal { case (Some(state: State), RecoveryCompleted) => + handleRecoveryCompleted(state) + } + // tag with location id so we can replicate only to the right edge node + .withTaggerForState { + case (None, _) => Set.empty + case (Some(state), _) => Set(state.stationLocationId) + } + + private def handleCommand( + state: Option[State], + command: Command): Effect[Event, Option[State]] = + state match { + case None => handleCommandNoState(command) + case Some(state) => handleCommandInitialized(state, command) + } + + private def handleCommandNoState( + command: Command): Effect[Event, Option[State]] = + command match { + case Create(locationId, chargingSlots, replyTo) => + Effect + .persist(Created(locationId, chargingSlots)) + .thenReply(replyTo)(_ => Done) + case unexpected => + context.log.warn( + "Got an unexpected command {} but charging station with id {} not initialized", + unexpected.getClass, + replicationContext.entityId) + Effect.none + } + + private def handleCommandInitialized( + state: ChargingStation.State, + command: ChargingStation.Command): Effect[Event, Option[State]] = { + command match { + case Create(_, _, _) => + context.log.warn( + "Got a create command, but station id {} was already created, ignoring", + replicationContext.entityId) + Effect.none + + case StartCharging(droneId, replyTo) => + if (state.dronesCharging.exists(_.droneId == droneId)) { + context.log.warn( + "Drone {} requested charging but is already charging. Ignoring.", + droneId) + Effect.none + } else if (state.dronesCharging.size >= state.chargingSlots) { + val earliestFreeSlot = state.dronesCharging.map(_.chargingDone).min + context.log.info( + "Drone {} requested charging but all stations busy, earliest free slot {}", + droneId, + earliestFreeSlot) + Effect.reply(replyTo)(AllSlotsBusy(earliestFreeSlot)) + } else { + // charge + val chargeCompletedBy = + Instant.now().plusSeconds(FullChargeTime.toSeconds) + context.log.info( + "Drone {} requested charging, will complete charging at {}", + droneId, + chargeCompletedBy) + val event = ChargingStarted(droneId, chargeCompletedBy) + Effect.persist(event).thenRun { (_: Option[State]) => + timers.startSingleTimer( + CompleteCharging(droneId), + durationUntil(chargeCompletedBy)) + // Note: The event is also the reply + replyTo ! event + } + } + + case CompleteCharging(droneId) => + Effect.persist(ChargingCompleted(droneId)) + + case GetState(replyTo) => + Effect.reply(replyTo)(state) + + } + } + + private def handleEvent(state: Option[State], event: Event): Option[State] = { + state match { + case None => + event match { + case Created(locationId, chargingSlots) => + Some(State(chargingSlots, Set.empty, locationId)) + case unexpected => + throw new IllegalArgumentException( + s"Got unexpected event ${unexpected} for uninitialized state") + } + + case Some(state) => + event match { + case Created(_, _) => + context.log.warn("Saw a second created event, ignoring") + Some(state) + case ChargingStarted(droneId, chargeComplete) => + Some( + state.copy(dronesCharging = state.dronesCharging + ChargingDrone( + droneId, + chargeComplete, + replicationContext.origin.id))) + case ChargingCompleted(droneId) => + Some( + state.copy(dronesCharging = + state.dronesCharging.filterNot(_.droneId == droneId))) + } + + } + } + + private def handleRecoveryCompleted(state: State): Unit = { + // Complete or set up timers for completion for drones charging, + // but only if the charging was initiated in this replica + val now = Instant.now() + state.dronesCharging + .filter(_.replicaId == replicationContext.replicaId.id) + .foreach { chargingDrone => + if (chargingDrone.chargingDone < now) + context.self ! CompleteCharging(chargingDrone.droneId) + else + timers.startSingleTimer( + CompleteCharging(chargingDrone.droneId), + durationUntil(chargingDrone.chargingDone)) + } + } + +} diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala index 956fb3dd6..6766e702c 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala @@ -4,6 +4,7 @@ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorSystem, Behavior } import akka.cluster.typed.Cluster import akka.cluster.typed.Join +import charging.ChargingStation object Main { @@ -39,6 +40,9 @@ object Main { new DeliveriesQueueServiceImpl(settings, deliveriesQueue)( context.system) + // replicated charging station entity + ChargingStation.initEdge(settings.locationId)(context.system) + val grpcInterface = context.system.settings.config .getString("local-drone-control.grpc.interface")