Skip to content

Commit

Permalink
doc: Samples for replicated event sourcing over gRPC (#769)
Browse files Browse the repository at this point in the history
Additionally:
* Api to allow replicated behaviors to compose with setup behaviors
  • Loading branch information
pvlugter authored Jan 25, 2023
1 parent ee1d5e4 commit b5b4b4c
Show file tree
Hide file tree
Showing 69 changed files with 2,620 additions and 59 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,12 @@ jobs:
cd samples/grpc/shopping-analytics-service-java
mvn compile -nsu -Dakka-projection.version=`cat ~/.version`
- name: Compile Scala Replicated Event Sourcing over gRPC sample Shopping Cart
run: |-
cd samples/replicated/shopping-cart-service-scala
sbt compile -Dakka-projection.version=`cat ~/.version`
- name: Compile Java Replicated Event Sourcing over gRPC sample Shopping Cart
run: |-
cd samples/replicated/shopping-cart-service-java
mvn compile -nsu -Dakka-projection.version=`cat ~/.version`
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import akka.persistence.typed.ReplicaId
import akka.persistence.typed.crdt.LwwTime
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicationContext
import akka.projection.grpc.TestContainerConf
import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.replication
import akka.projection.grpc.replication.scaladsl.Replica
import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors
import akka.projection.grpc.replication.scaladsl.Replication
import akka.projection.grpc.replication.scaladsl.ReplicationProjectionProvider
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
Expand Down Expand Up @@ -98,27 +98,28 @@ object ReplicationIntegrationSpec {

case class State(greeting: String, timestamp: LwwTime)

// FIXME needing to return/pass an EventSourcedBehavior means it is impossible to compose for logging/setup etc
def apply(replicationContext: ReplicationContext) =
EventSourcedBehavior[Command, Event, State](
replicationContext.persistenceId,
State.initial, {
case (State(greeting, _), Get(replyTo)) =>
replyTo ! greeting
Effect.none
case (state, SetGreeting(greeting, replyTo)) =>
Effect
.persist(
GreetingChanged(
greeting,
state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)))
.thenRun((_: State) => replyTo ! Done)
}, {
case (currentState, GreetingChanged(newGreeting, newTimestamp)) =>
if (newTimestamp.isAfter(currentState.timestamp))
State(newGreeting, newTimestamp)
else currentState
})
def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) =
replicatedBehaviors.setup { replicationContext =>
EventSourcedBehavior[Command, Event, State](
replicationContext.persistenceId,
State.initial, {
case (State(greeting, _), Get(replyTo)) =>
replyTo ! greeting
Effect.none
case (state, SetGreeting(greeting, replyTo)) =>
Effect
.persist(
GreetingChanged(
greeting,
state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)))
.thenRun((_: State) => replyTo ! Done)
}, {
case (currentState, GreetingChanged(newGreeting, newTimestamp)) =>
if (newTimestamp.isAfter(currentState.timestamp))
State(newGreeting, newTimestamp)
else currentState
})
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.replication
import akka.projection.grpc.replication.javadsl.Replica
import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors
import akka.projection.grpc.replication.javadsl.Replication
import akka.projection.grpc.replication.javadsl.ReplicationProjectionProvider
import akka.projection.grpc.replication.javadsl.ReplicationSettings
Expand Down Expand Up @@ -104,7 +105,8 @@ object ReplicationJavaDSLIntegrationSpec {

case class State(greeting: String, timestamp: LwwTime)

def create(replicationContext: ReplicationContext) = new LWWHelloWorldBehavior(replicationContext)
def create(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) =
replicatedBehaviors.setup { replicationContext => new LWWHelloWorldBehavior(replicationContext) }

class LWWHelloWorldBehavior(replicationContext: ReplicationContext)
extends EventSourcedBehavior[Command, Event, State](replicationContext.persistenceId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.grpc.replication.javadsl

import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
import akka.japi.function.{ Function => JFunction }
import akka.persistence.typed.javadsl.EventSourcedBehavior
import akka.persistence.typed.javadsl.ReplicationContext

/**
* Dynamically provides factory methods for creating replicated event sourced behaviors.
*
* Must be used to create an event sourced behavior to be replicated with [[Replication.grpcReplication]].
*
* Can optionally be composed with other Behavior factories, to get access to actor context or timers.
*/
@ApiMayChange
abstract class ReplicatedBehaviors[Command, Event, State] {
def setup(factory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]]): Behavior[Command]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.projection.grpc.replication.javadsl

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.cluster.sharding.typed.ReplicatedEntity
Expand All @@ -17,7 +18,6 @@ import akka.http.javadsl.model.HttpResponse
import akka.japi.function.{ Function => JFunction }
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.internal.ReplicationContextImpl
import akka.persistence.typed.javadsl.EventSourcedBehavior
import akka.persistence.typed.javadsl.ReplicationContext
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.projection.grpc.producer.javadsl.EventProducer
Expand Down Expand Up @@ -74,7 +74,7 @@ object Replication {
*/
def grpcReplication[Command, Event, State](
settings: ReplicationSettings[Command],
replicatedBehaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]],
replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]],
system: ActorSystem[_]): Replication[Command] = {

val scalaReplicationSettings = settings.toScala
Expand All @@ -88,15 +88,17 @@ object Replication {
settings.entityTypeKey, { entityContext: EntityContext[Command] =>
val replicationId =
ReplicationId(entityContext.getEntityTypeKey.name, entityContext.getEntityId, settings.selfReplicaId)
ReplicatedEventSourcing.externalReplication(
replicationId,
scalaReplicationSettings.otherReplicas.map(_.replicaId) + settings.selfReplicaId)(
replicationContext =>
replicatedBehaviorFactory
.apply(replicationContext.asInstanceOf[ReplicationContext])
.createEventSourcedBehavior()
// MEH
.withReplication(replicationContext.asInstanceOf[ReplicationContextImpl]))
replicatedBehaviorFactory.apply(
factory =>
ReplicatedEventSourcing.externalReplication(
replicationId,
scalaReplicationSettings.otherReplicas.map(_.replicaId) + settings.selfReplicaId)(
replicationContext =>
factory
.apply(replicationContext.asInstanceOf[ReplicationContext])
.createEventSourcedBehavior()
// MEH
.withReplication(replicationContext.asInstanceOf[ReplicationContextImpl])))
}))
.toScala)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.grpc.replication.scaladsl

import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicationContext

/**
* Dynamically provides factory methods for creating replicated event sourced behaviors.
*
* Must be used to create an event sourced behavior to be replicated with [[Replication.grpcReplication]].
*
* Can optionally be composed with other Behavior factories, to get access to actor context or timers.
*/
@ApiMayChange
abstract class ReplicatedBehaviors[Command, Event, State] {
def setup(factory: ReplicationContext => EventSourcedBehavior[Command, Event, State]): Behavior[Command]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.projection.grpc.replication.scaladsl

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.cluster.sharding.typed.ReplicatedEntity
Expand All @@ -14,9 +15,7 @@ import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.persistence.typed.scaladsl.ReplicationContext
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.grpc.replication.internal.ReplicationImpl

Expand Down Expand Up @@ -69,7 +68,7 @@ object Replication {
* Important: Note that this does not publish the endpoint, additional steps are needed!
*/
def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command])(
replicatedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])(
replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])(
implicit system: ActorSystem[_]): Replication[Command] = {

val replicatedEntity =
Expand All @@ -78,9 +77,11 @@ object Replication {
settings.configureEntity.apply(Entity(settings.entityTypeKey) { entityContext =>
val replicationId =
ReplicationId(entityContext.entityTypeKey.name, entityContext.entityId, settings.selfReplicaId)
ReplicatedEventSourcing.externalReplication(
replicationId,
settings.otherReplicas.map(_.replicaId) + settings.selfReplicaId)(replicatedBehaviorFactory)
replicatedBehaviorFactory { factory =>
ReplicatedEventSourcing.externalReplication(
replicationId,
settings.otherReplicas.map(_.replicaId) + settings.selfReplicaId)(factory)
}
}))

ReplicationImpl.grpcReplication[Command, Event, State](settings, replicatedEntity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.grpc.GrpcClientSettings;
import akka.grpc.javadsl.ServiceHandler;
import akka.http.javadsl.Http;
Expand All @@ -17,12 +18,11 @@
import akka.persistence.query.Offset;
import akka.persistence.query.typed.EventEnvelope;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior;
import akka.persistence.typed.javadsl.ReplicationContext;
import akka.projection.ProjectionContext;
import akka.projection.ProjectionId;
import akka.projection.grpc.producer.EventProducerSettings;
import akka.projection.grpc.replication.javadsl.Replica;
import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors;
import akka.projection.grpc.replication.javadsl.Replication;
import akka.projection.grpc.replication.javadsl.ReplicationProjectionProvider;
import akka.projection.grpc.replication.javadsl.ReplicationSettings;
Expand All @@ -40,11 +40,15 @@

public class ReplicationCompileTest {

interface MyCommand {}
interface MyCommand {}

static ReplicatedEventSourcedBehavior<MyCommand, Void, Void> create(ReplicationContext context) {
throw new UnsupportedOperationException("just a dummy factory method");
}
static Behavior<MyCommand> create(
ReplicatedBehaviors<MyCommand, Void, Void> replicatedBehaviors) {
return replicatedBehaviors.setup(
replicationContext -> {
throw new UnsupportedOperationException("just a dummy factory method");
});
}

public static void start(ActorSystem<?> system) {
Set<Replica> otherReplicas = new HashSet<>();
Expand Down
59 changes: 48 additions & 11 deletions docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ For a basic overview of Replicated Event Sourcing see the @extref:[Akka Replicat

Akka Replicated Event Sourcing over gRPC consists of the following three parts:

The Replicated Event Sourced Behavior is run in each replica as a sharded entity using @extref:[Akka Cluster Sharding](akka:typed/cluster-sharding.html).
* The Replicated Event Sourced Behavior is run in each replica as a sharded entity using @extref:[Akka Cluster
Sharding](akka:typed/cluster-sharding.html).

The events of each replica are published to the other replicas using @ref:[Akka Projection gRPC](grpc.md) endpoints.

Each replica consumes a number of parallel slices of the events from each other replica by running Akka Projection gRPC
in @extref:[Akka Sharded Daemon Process](akka:typed/cluster-sharded-daemon-process.html).
* The events of each replica are published to the other replicas using @ref:[Akka Projection gRPC](grpc.md) endpoints.

* Each replica consumes a number of parallel slices of the events from each other replica by running Akka Projection
gRPC in @extref:[Akka Sharded Daemon Process](akka:typed/cluster-sharded-daemon-process.html).


## Dependencies
Expand All @@ -46,6 +46,12 @@ The functionality is provided through the `akka-projection-grpc` module.

To use the gRPC module of Akka Projections add the following dependency in your project:

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-projection-grpc_$scala.binary.version$
version=$project.version$
}

Akka Replicated Event Sourcing over gRPC requires Akka 2.8.0 or later and can only be run in an Akka cluster since it uses cluster components.

It is currently only possible to use @extref:[akka-persistence-r2dbc](akka-persistence-r2dbc:projection.html) as the
Expand Down Expand Up @@ -77,7 +83,23 @@ The table below shows `akka-projection-grpc`'s direct dependencies, and the seco

@@dependencies{ projectId="akka-projection-grpc" }

## API and setup

The same API as regular `EventSourcedBehavior`s is used to define the logic. See @extref:[Replicated Event Sourcing](akka:typed/replicated-eventsourcing.html) for more detail on designing an entity for replication.

To enable an entity for Replicated Event Sourcing over gRPC, use the @apidoc[Replication$] `grpcReplication` method,
which takes @apidoc[ReplicationSettings], a factory function for the behavior, and an actor system.

The factory function will be passed a @apidoc[ReplicatedBehaviors] factory that must be used to set up the replicated
event sourced behavior. Its `setup` method provides a @apidoc[ReplicationContext] to create an `EventSourcedBehavior`
which will then be configured for replication. The behavior factory can be composed with other behavior factories, if
access to the actor context or timers are needed.

Scala
: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init }

Java
: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init }

### Settings

Expand Down Expand Up @@ -106,10 +128,22 @@ Binding the publisher is a manual step to allow arbitrary customization of the A
with other HTTP and gRPC routes.

When there is only a single replicated entity and no other usage of Akka gRPC Projections in an application a
convenience is provided through `createSingleServiceHandler` on @apidoc[akka.projection.grpc.replication.*.Replication] which
will create a single handler, this can then be bound:
convenience is provided through `createSingleServiceHandler` on @apidoc[akka.projection.grpc.replication.*.Replication]
which will create a single handler:

FIXME sample snippet
Scala
: @@snip [Main.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala) { #single-service-handler }

Java
: @@snip [Main.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/Main.java) { #single-service-handler }

This can then be bound:

Scala
: @@snip [ShoppingCartServer.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServer.scala) { #bind }

Java
: @@snip [ShoppingCartServer.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServer.java) { #bind }

When multiple producers exist, all instances of @apidoc[akka.projection.grpc.producer.EventProducerSettings] need to
be passed at once to `EventProducer.grpcServiceHandler` to create a single producer service handling each of the event
Expand Down Expand Up @@ -137,9 +171,12 @@ new versions of the application to the replicas.
FIXME something more here - serialization can fail and stop the replication, but it could also silently lose data in new fields
before the consuming side has a new version.

### Sample projects
## Sample projects

Source code and build files for complete sample projects can be found in the `akka/akka-projection` GitHub repository.

FIXME should we have one? The shopping cart?
* [Replicated shopping cart service in Scala](https://github.com/akka/akka-projection/tree/main/samples/replicated/shopping-cart-service-scala)
* [Replicated shopping cart service in Java](https://github.com/akka/akka-projection/tree/main/samples/replicated/shopping-cart-service-java)

## Access control

Expand All @@ -151,4 +188,4 @@ The consumer can pass metadata, such as auth headers, in each request to the pro

Authentication and authorization for the producer can be done by implementing an @apidoc[EventProducerInterceptor] and passing
it to the `grpcServiceHandler` method during producer bootstrap. The interceptor is invoked with the stream id and
gRPC request metadata for each incoming request and can return a suitable error through @apidoc[GrpcServiceException]
gRPC request metadata for each incoming request and can return a suitable error through @apidoc[GrpcServiceException]
4 changes: 2 additions & 2 deletions docs/src/main/paradox/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ but it illustrates how to combine the `EventProducer` service with other gRPC se

Source code and build files for complete sample projects can be found in `akka/akka-projection` GitHub repository.

Scala:
Java:

* [Producer service: shopping-cart-service-java](https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-cart-service-java)
* [Consumer service: shopping-analytics-service-java](https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-analytics-service-java)

Java:
Scala:

* [Producer service: shopping-cart-service-scala](https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-cart-service-scala)
* [Consumer service: shopping-analytics-service-scala](https://github.com/akka/akka-projection/tree/main/samples/grpc/shopping-analytics-service-scala)
Expand Down
Loading

0 comments on commit b5b4b4c

Please sign in to comment.