Skip to content

Commit

Permalink
Guide first iteration done
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 12, 2023
1 parent 9b9f340 commit a5fa518
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 12 deletions.
252 changes: 242 additions & 10 deletions akka-edge-docs/src/main/paradox/guide/5-charging-station.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ in the form of a charging station. The charging stations are created with a loca
local-drone-control edge services where the entity is replicated. Drones in that location can request to charge in the
charging station, and be charged if there is a free charging slot.

## Charging station entity
## Implementing the charging station entity

The API for creating a replicated event sourced behavior extends the regular event sourced behavior. It accepts three
different commands from the outside `Create` to initialize a charging station, `StartCharging` to start a charging session
for a drone if possible and `GetState` to query the station for its current state. There is also a private `ChargingCompleted`
command that the entity can send to itself.
### Commands and events

The charging station accepts three different commands from the outside `Create` to initialize a charging station,
`StartCharging` to start a charging session for a drone if possible and `GetState` to query the station for its current
state. There is also a private `CompleteCharging` command that only the entity can create.

The `Create` command leads to a `Created` event which is persisted and initialized the charging station.

Expand All @@ -23,10 +24,241 @@ Scala
Java
: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #commands #events }

### State

The state of the charging station starts as @java[`null`]@scala[`None`] and requires a `Create` message for the station
to be initialized with a @java[`State`]@scala[`Some(State)`].

The `State` contains the number of charging slots that can concurrently charge drones and a set of currently charging drones.

The state also contains a location id identifying where it is, matching the location id structure of
the local-drone-control service. This is needed so that the station can be replicated only to the edge location where
it is located.

Scala
: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #state }

Java
: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #state }

### Command handler

The command handler is in fact two separate handlers, one for when the entity is not yet initialized, only accepting
the `Create` command, and one that is used to handle commands once the station has been initialized.

The `StartCharging` command is the only one that requires more complicated logic: if a slot is free, persist a `ChargingStarted`
event and tell the drone when charging will be done. A timer is also set, to send a `CompleteCharging` to itself once the
charging time has passed. If all charging slots are busy the reply will instead be when the first slot will
be free again and the drone can come back and try charge again.

Scala
: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #commandHandler }

Java
: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #commandHandler }

### Event handler

Just like the command handler the event handler is different depending on if there is state or not. If there is no state
only the `Created` event is accepted.

Once initialized the charging station expects `ChargingStarted` and `ChargingCompleted` events, additional `Created`
events are ignored.

Scala
: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #eventHandler }

Java
: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #eventHandler }

The charging station is a very limited replicated entity example to keep the guide simple. It doesn't expect any possible
conflicts, stations are created, once, in the central cloud and replicated to the edge, updates related to drones currently
charging in the station happen at the edge and are replicated to the cloud. Akka replicated event sourcing provides APIs
for both CRDTs where conflicts are automatically handled by the data structure and business domain level conflict resolution.
For more details about see the @extref[Akka documentation](akka:replicated-eventsourcing.html).


### Tagging based on location

To be able to control where the charging station is replicated we tag the events using the location id from the state as
a topic:

Scala
: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #tagging }

Java
: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #tagging }

### Additional logic on start

In case the entity is stopped or entire local-drone-control system is shut down, we also need to schedule completion of
current charging sessions when it completes starting up, this is done with a signal handler for the `RecoveryCompleted` signal
which Akka will send to it every time it has completed starting up. Note how we are using the `ReplicationContext` to only
do this on the replica where the charging was started:

Scala
: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #eventHandler }

Java
: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #eventHandler }

### Serialization

The state and events of the entity must be serializable because they are written to the datastore, if the local drone control needs to scale out across several nodes to handle traffic, the commands would also be sent between nodes within the Akka cluster. The sample project includes built-in CBOR serialization using the @extref[Akka Serialization Jackson module](akka:serialization-jackson.html). This section describes how serialization is implemented. You do not need to do anything specific to take advantage of CBOR, but this section explains how it is included.

The state, commands and events are marked as `akka.serialization.jackson.CborSerializable` which is configured to use the built-in CBOR serialization.

### Setting up replication for the charging station

Setup for the cloud replica and the edge node differs slightly.

For the restaurant-drone-deliveries service running in the cloud we set up a `ReplicationSettings` with edge replication
enabled:

Scala
: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #replicaInit }

Java
: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #replicaInit }

Since we have other events going over akka-projection-grpc producer push replication already in the restaurant-drone-deliveries service
we need to combine all such sources and destinations into single gRPC services:

Scala
: @@snip [Main.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala) { #replicationEndpoint }

Java
: @@snip [Main.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java) { #replicationEndpoint }

For the local-drone-control service we also create `ReplicationSettings` but pass them to a separate initialization method `Replication.grpcEdgeReplication`.
Since the edge node will be responsible for creating connections, no gRPC services needs to be bound:

Scala
: @@snip [ChargingStation.scala](/samples/grpc/local-drone-control-scala/src/main/scala/charging/ChargingStation.scala) { #edgeReplicaInit }

Java
: @@snip [ChargingStation.java](/samples/grpc/local-drone-control-java/src/main/java/charging/ChargingStation.java) { #edgeReplicaInit }

The returned `EdgeReplication` instance gives us access to a `entityRefFactory` for sending messages to the charging stations.


## Service for interacting with the charging station

In the restaurant-drone-deliveries service we introduce a separate gRPC endpoint for creating and looking at charging
station state:

Scala
: @@snip [charging_station_api.proto](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/charging/charging_station_api.proto) { }

Java
: @@snip [charging_station_api.proto](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/charging/charging_station_api.proto) { }

The service implementation takes the `entityRefFactory` as a constructor parameter and uses that to create `EntityRef` instances
for specific charging stations to interact with them:

Scala
: @@snip [ChargingStationServiceImpl.scala](/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/charging/ChargingStationServiceImpl.scala) { }

Java
: @@snip [ChargingStationServiceImpl.java](/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/charging/ChargingStationServiceImpl.java) { }

## Interacting with the charging station at the edge

The local-drone-control service does not contain the full gRPC API for interaction but instead has a single `goCharge` method
in the drone gRPC service:

Scala
: @@snip [DroneServiceImpl.scala](/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala) { #charging }

Java
: @@snip [DroneServiceImpl.java](/samples/grpc/local-drone-control-java/src/main/java/local/drones/DroneServiceImpl.java) { #charging }


## Running

The complete sample can be downloaded from GitHub, but note that it also includes the next steps of the guide:

* Java: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-java
* Scala: https://github.com/akka/akka-projection/tree/main/samples/grpc/restaurant-drone-deliveries-service-scala

As this service consumes events from the service built in the previous step, start the local-drone-control service first:

@@@ div { .group-scala }

To start the local-drone-control-service:

```shell
sbt run
```

@@@

@@@ div { .group-java }

```shell
mvn compile exec:exec
```

@@@

Then start the drone-restaurant-deliveries-service.

As the service needs a PostgreSQL instance running, start that up in a docker container and create the database
schema if you did not do that in a previous step of the guide:

```shell
docker compose up --wait
docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql
```

Then start the service:

@@@ div { .group-scala }

```shell
sbt -Dconfig.resource=local1.conf run
```

And optionally one or two more Akka cluster nodes, but note that the local drone controls
are statically configured to the gRPC port of the first and will only publish events to that node.

```shell
sbt -Dconfig.resource=local2.conf run
sbt -Dconfig.resource=local3.conf run
```

@@@

@@@ div { .group-java }

```shell
mvn compile exec:exec -DAPP_CONFIG=local1.conf
```

And optionally one or two more Akka cluster nodes, but note that the local drone controls
are statically configured to the gRPC port of the first and will only publish events to that node.

```shell
mvn compile exec:exec -DAPP_CONFIG=local2.conf
mvn compile exec:exec -DAPP_CONFIG=local3.conf
```

@@@

Set up a replicated charging station in the cloud service using [grpcurl](https://github.com/fullstorydev/grpcurl):

```shell
grpcurl -d '{"charging_station_id":"station1","location_id": "sweden/stockholm/kungsholmen", "charging_slots": 4}' -plaintext localhost:8101 charging.ChargingStationService.CreateChargingStation
```

Ask to charge the drone with id `drone1` at the charging station in the edge service:

```shell
grpcurl -d '{"drone_id":"drone1","charging_station_id":"station1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.GoCharge
```

Inspect the state of the charging station in the cloud service to see the charging drone replicated there:

The charging station is a simplified replicated entity in that it doesn't really expect any possible conflicts, stations
are created in the central cloud and replicated to the edge, updates related to drones currently charging in the station
happen at the edge and are replicated to the cloud. Akka replicated event sourcing provides APIs for both CRDTs where
conflicts are automatically handled by the data structure and business domain level conflict resolution. For more details
about see the @extref[Akka documentation](akka:replicated-eventsourcing.html).
```shell
grpcurl -d '{"charging_station_id":"station1"}' -plaintext localhost:8101 charging.ChargingStationService.GetChargingStationState
```
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public ChargingDrone(String droneId, Instant chargingDone, String replicaId) {
}
}
// #events

// #state
public static final class State implements CborSerializable {
public final int chargingSlots;
public final Set<ChargingDrone> dronesCharging;
Expand All @@ -138,6 +140,7 @@ public State(int chargingSlots, Set<ChargingDrone> dronesCharging, String statio
this.stationLocationId = stationLocationId;
}
}
// #state

public static final String ENTITY_TYPE = "charging-station";

Expand All @@ -147,6 +150,7 @@ public State(int chargingSlots, Set<ChargingDrone> dronesCharging, String statio
* Init for running in edge node, this is the only difference from the ChargingStation in
* restaurant-deliveries-service
*/
// #edgeReplicaInit
public static EdgeReplication<Command> initEdge(ActorSystem<?> system, String locationId) {
var replicationSettings =
ReplicationSettings.create(
Expand All @@ -159,8 +163,10 @@ public static EdgeReplication<Command> initEdge(ActorSystem<?> system, String lo
new ConsumerFilter.IncludeTopics(Set.of(locationId))));
return Replication.grpcEdgeReplication(replicationSettings, ChargingStation::create, system);
}
// #edgeReplicaInit

/** Init for running in cloud replica */
// #replicaInit
public static Replication<Command> init(ActorSystem<?> system) {
var replicationSettings =
ReplicationSettings.create(
Expand All @@ -169,6 +175,7 @@ public static Replication<Command> init(ActorSystem<?> system) {
.withEdgeReplication(true);
return Replication.grpcReplication(replicationSettings, ChargingStation::create, system);
}
// #replicaInit

public static Behavior<Command> create(
ReplicatedBehaviors<Command, Event, State> replicatedBehaviors) {
Expand Down Expand Up @@ -207,9 +214,9 @@ public State emptyState() {
return null;
}

// #commandHandler
@Override
public CommandHandler<Command, Event, State> commandHandler() {

var noStateHandler =
newCommandHandlerBuilder()
.forNullState()
Expand Down Expand Up @@ -317,6 +324,7 @@ private Effect<Event, State> handleStartCharging(State state, StartCharging star
});
}
}
// #commandHandler

@Override
public EventHandler<State, Event> eventHandler() {
Expand Down Expand Up @@ -367,20 +375,25 @@ public EventHandler<State, Event> eventHandler() {
return noStateHandler.orElse(initializedStateHandler).build();
}

// #recoveryCompleted
@Override
public SignalHandler<State> signalHandler() {
return newSignalHandlerBuilder()
.onSignal(
RecoveryCompleted.class, (state, recoveryCompleted) -> handleRecoveryCompleted(state))
.build();
}
// #recoveryCompleted

// #tagging
@Override
public Set<String> tagsFor(State state, Event event) {
if (state == null) return Set.of();
else return Set.of("t:" + state.stationLocationId);
}
// #tagging

// #recoveryCompleted
private void handleRecoveryCompleted(State state) {
// FIXME this scheme is not quite reliable, because station is not remembered/restarted
// until next new event/command if edge system is shut down/restarted
Expand All @@ -401,4 +414,5 @@ private void handleRecoveryCompleted(State state) {
});
}
}
// #recoveryCompleted
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public CompletionStage<CompleteDeliveryResponse> completeDelivery(CompleteDelive
return convertError(reply);
}

// #charging
@Override
public CompletionStage<ChargingResponse> goCharge(GoChargeRequest in) {
logger.info("Requesting charge of {} from {}", in.getDroneId(), in.getChargingStationId());
Expand Down Expand Up @@ -159,7 +160,7 @@ public CompletionStage<ChargingResponse> goCharge(GoChargeRequest in) {

return convertError(response);
}

// #charging
private static Timestamp instantToProtoTimestamp(Instant instant) {
return Timestamp.newBuilder()
.setSeconds(instant.getEpochSecond())
Expand Down
Loading

0 comments on commit a5fa518

Please sign in to comment.