From 41147c8fb530616ccd1368fff36b7f710a1e3b9f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 15 Nov 2023 15:01:21 +0100 Subject: [PATCH] feat: Allow indirect replication --- .../src/it/resources/logback-test.xml | 4 +- .../IndirectReplicationIntegrationSpec.scala | 297 ++++++++++++++++++ .../ReplicationIntegrationSpec.scala | 25 +- .../replication/ReplicationSettingsSpec.scala | 13 + .../ReplicationSettings.excludes | 2 + .../internal/ReplicationImpl.scala | 25 +- .../scaladsl/ReplicationSettings.scala | 25 +- 7 files changed, 362 insertions(+), 29 deletions(-) create mode 100644 akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/IndirectReplicationIntegrationSpec.scala create mode 100644 akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/ReplicationSettings.excludes diff --git a/akka-projection-grpc-tests/src/it/resources/logback-test.xml b/akka-projection-grpc-tests/src/it/resources/logback-test.xml index 1365d007f..8773b3a97 100644 --- a/akka-projection-grpc-tests/src/it/resources/logback-test.xml +++ b/akka-projection-grpc-tests/src/it/resources/logback-test.xml @@ -13,10 +13,10 @@ - + - + diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/IndirectReplicationIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/IndirectReplicationIntegrationSpec.scala new file mode 100644 index 000000000..45fee9fab --- /dev/null +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/IndirectReplicationIntegrationSpec.scala @@ -0,0 +1,297 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ + +package akka.projection.grpc.replication + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps +import akka.cluster.MemberStatus +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.grpc.GrpcClientSettings +import akka.http.scaladsl.Http +import akka.persistence.typed.ReplicaId +import akka.projection.grpc.TestContainerConf +import akka.projection.grpc.TestDbLifecycle +import akka.projection.grpc.producer.EventProducerSettings +import akka.projection.grpc.replication.scaladsl.Replica +import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import akka.projection.r2dbc.R2dbcProjectionSettings +import akka.projection.r2dbc.scaladsl.R2dbcReplication +import akka.testkit.SocketUtil +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +object IndirectReplicationIntegrationSpec { + + private def config(dc: ReplicaId): Config = + ConfigFactory.parseString(s""" + akka.actor.provider = cluster + akka.actor { + serialization-bindings { + "${classOf[ReplicationIntegrationSpec].getName}$$LWWHelloWorld$$Event" = jackson-json + } + } + akka.http.server.preview.enable-http2 = on + akka.persistence.r2dbc { + query { + refresh-interval = 500 millis + # reducing this to have quicker test, triggers backtracking earlier + backtracking.behind-current-time = 3 seconds + } + } + akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } + } + akka.projection.r2dbc.offset-store { + timestamp-offset-table = "akka_projection_timestamp_offset_store_${dc.id}" + } + akka.remote.artery.canonical.host = "127.0.0.1" + akka.remote.artery.canonical.port = 0 + akka.actor.testkit.typed { + filter-leeway = 10s + system-shutdown-default = 30s + } + """) + + private val DCA = ReplicaId("DCA") + private val DCB = ReplicaId("DCB") + private val DCC = ReplicaId("DCC") + +} + +class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf) + extends ScalaTestWithActorTestKit( + akka.actor + .ActorSystem( + "IndirectReplicationIntegrationSpecA", + IndirectReplicationIntegrationSpec + .config(IndirectReplicationIntegrationSpec.DCA) + .withFallback(testContainerConf.config)) + .toTyped) + with AnyWordSpecLike + with TestDbLifecycle + with BeforeAndAfterAll + with LogCapturing { + import IndirectReplicationIntegrationSpec._ + import ReplicationIntegrationSpec.LWWHelloWorld + implicit val ec: ExecutionContext = system.executionContext + + def this() = this(new TestContainerConf) + + private val logger = LoggerFactory.getLogger(classOf[IndirectReplicationIntegrationSpec]) + override def typedSystem: ActorSystem[_] = testKit.system + + private val systems = Seq[ActorSystem[_]]( + typedSystem, + akka.actor + .ActorSystem( + "IndirectReplicationIntegrationSpecB", + IndirectReplicationIntegrationSpec.config(DCB).withFallback(testContainerConf.config)) + .toTyped, + akka.actor + .ActorSystem( + "IndirectReplicationIntegrationSpecC", + IndirectReplicationIntegrationSpec.config(DCC).withFallback(testContainerConf.config)) + .toTyped) + + private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, "127.0.0.1").map(_.getPort) + private val allDcsAndPorts = Seq(DCA, DCB, DCC).zip(grpcPorts) + private val allReplicas = allDcsAndPorts.map { + case (id, port) => + Replica(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", port).withTls(false)) + }.toSet + + private val testKitsPerDc = Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), DCC -> ActorTestKit(systems(2))) + private val systemPerDc = Map(DCA -> system, DCB -> systems(1), DCC -> systems(2)) + private val entityIds = Set("one", "two", "three") + + override protected def beforeAll(): Unit = { + super.beforeAll() + // We can share the journal to save a bit of work, because the persistence id contains + // the dc so is unique (this is ofc completely synthetic, the whole point of replication + // over grpc is to replicate between different dcs/regions with completely separate databases). + // The offset tables need to be separate though to not get conflicts on projection names + systemPerDc.values.foreach { system => + val r2dbcProjectionSettings = R2dbcProjectionSettings(system) + Await.result( + r2dbcExecutor.updateOne("beforeAll delete")( + _.createStatement(s"delete from ${r2dbcProjectionSettings.timestampOffsetTableWithSchema}")), + 10.seconds) + + } + } + + def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = { + val otherReplicas = selfReplicaId match { + case DCA => allReplicas.filter(r => r.replicaId == DCB || r.replicaId == DCC) + case DCB => allReplicas.filter(_.replicaId == DCA) + case DCC => allReplicas.filter(_.replicaId == DCA) + case other => throw new IllegalArgumentException(other.id) + } + + val settings = ReplicationSettings[LWWHelloWorld.Command]( + LWWHelloWorld.EntityType.name, + selfReplicaId, + EventProducerSettings(replicaSystem), + otherReplicas, + 10.seconds, + 8, + R2dbcReplication()) + .withIndirectReplication(true) + Replication.grpcReplication(settings)(LWWHelloWorld.apply)(replicaSystem) + } + + def assertGreeting(entityId: String, expected: String): Unit = { + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(expected) + }, 10.seconds) + } + } + } + } + + "Replication over gRPC" should { + "form three one node clusters" in { + testKitsPerDc.values.foreach { testKit => + val cluster = Cluster(testKit.system) + cluster.manager ! Join(cluster.selfMember.address) + testKit.createTestProbe().awaitAssert { + cluster.selfMember.status should ===(MemberStatus.Up) + } + } + } + + "start three replicas" in { + val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map { + case (replica, index) => + val system = systems(index) + logger + .infoN( + "Starting replica [{}], system [{}] on port [{}]", + replica.replicaId, + system.name, + replica.grpcClientSettings.defaultPort) + val started = startReplica(system, replica.replicaId) + val grpcPort = grpcPorts(index) + + // start producer server + Http(system) + .newServerAt("127.0.0.1", grpcPort) + .bind(started.createSingleServiceHandler()) + .map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext) + .map(_ => replica.replicaId -> started) + }) + + replicasStarted.futureValue + logger.info("All three replication/producer services bound") + } + + "replicate writes from one dc to the other two" in { + systemPerDc.keys.foreach { dc => + withClue(s"from ${dc.id}") { + Future + .sequence(entityIds.map { entityId => + logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) + ClusterSharding(systemPerDc(dc)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${dc.id}", _)) + }) + .futureValue + + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(s"hello 1 from ${dc.id}") + }, 10.seconds) + } + } + } + } + } + } + } + + "replicate concurrent writes to the other DCs" in (2 to 4).foreach { greetingNo => + withClue(s"Greeting $greetingNo") { + Future + .sequence(systemPerDc.keys.map { dc => + withClue(s"from ${dc.id}") { + Future.sequence(entityIds.map { entityId => + logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) + ClusterSharding(systemPerDc(dc)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting(s"hello $greetingNo from ${dc.id}", _)) + }) + } + }) + .futureValue // all three updated in roughly parallel + + // All 3 should eventually arrive at the same value + testKit + .createTestProbe() + .awaitAssert( + { + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + testKitsPerDc.values.map { testKit => + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + entityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue + }.toSet should have size (1) + } + } + }, + 20.seconds) + } + } + } + + protected override def afterAll(): Unit = { + logger.info("Shutting down all three DCs") + systems.foreach(_.terminate()) // speed up termination by terminating all at the once + // and then make sure they are completely shutdown + systems.foreach { system => + ActorTestKit.shutdown(system) + } + super.afterAll() + } +} diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala index b546d30b5..e1da530f2 100644 --- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala @@ -25,7 +25,6 @@ import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.projection.grpc.TestContainerConf import akka.projection.grpc.TestDbLifecycle import akka.projection.grpc.producer.EventProducerSettings -import akka.projection.grpc.replication import akka.projection.grpc.replication.scaladsl.Replica import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors import akka.projection.grpc.replication.scaladsl.Replication @@ -38,12 +37,13 @@ import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory - import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.DurationInt +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + object ReplicationIntegrationSpec { private def config(dc: ReplicaId): Config = @@ -51,7 +51,7 @@ object ReplicationIntegrationSpec { akka.actor.provider = cluster akka.actor { serialization-bindings { - "${classOf[replication.ReplicationIntegrationSpec].getName}$$LWWHelloWorld$$Event" = jackson-json + "${classOf[ReplicationIntegrationSpec].getName}$$LWWHelloWorld$$Event" = jackson-json } } akka.http.server.preview.enable-http2 = on @@ -84,6 +84,8 @@ object ReplicationIntegrationSpec { object LWWHelloWorld { + val EntityType: EntityTypeKey[Command] = EntityTypeKey[Command]("hello-world") + sealed trait Command case class Get(replyTo: ActorRef[String]) extends Command @@ -168,7 +170,6 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) private val testKitsPerDc = Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), DCC -> ActorTestKit(systems(2))) private val systemPerDc = Map(DCA -> system, DCB -> systems(1), DCC -> systems(2)) - private var replicatedEventSourcingOverGrpcPerDc: Map[ReplicaId, Replication[LWWHelloWorld.Command]] = Map.empty private val entityIds = Set("one", "two", "three") override protected def beforeAll(): Unit = { @@ -189,7 +190,7 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = { val settings = ReplicationSettings[LWWHelloWorld.Command]( - "hello-world", + LWWHelloWorld.EntityType.name, selfReplicaId, EventProducerSettings(replicaSystem), allReplicas, @@ -231,31 +232,30 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) .map(_ => replica.replicaId -> started) }) - replicatedEventSourcingOverGrpcPerDc = replicasStarted.futureValue.toMap + replicasStarted.futureValue logger.info("All three replication/producer services bound") } "replicate writes from one dc to the other two" in { - val entityTypeKey = replicatedEventSourcingOverGrpcPerDc.values.head.entityTypeKey systemPerDc.keys.foreach { dc => withClue(s"from ${dc.id}") { Future .sequence(entityIds.map { entityId => logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) ClusterSharding(systemPerDc(dc)) - .entityRefFor(entityTypeKey, entityId) + .entityRefFor(LWWHelloWorld.EntityType, entityId) .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${dc.id}", _)) }) .futureValue testKitsPerDc.values.foreach { testKit => - withClue(s"on ${system.name}") { + withClue(s"on ${testKit.system.name}") { val probe = testKit.createTestProbe() entityIds.foreach { entityId => withClue(s"for entity id $entityId") { val entityRef = ClusterSharding(testKit.system) - .entityRefFor(entityTypeKey, entityId) + .entityRefFor(LWWHelloWorld.EntityType, entityId) probe.awaitAssert({ entityRef @@ -272,14 +272,13 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) "replicate concurrent writes to the other DCs" in (2 to 4).foreach { greetingNo => withClue(s"Greeting $greetingNo") { - val entityTypeKey = replicatedEventSourcingOverGrpcPerDc.values.head.entityTypeKey Future .sequence(systemPerDc.keys.map { dc => withClue(s"from ${dc.id}") { Future.sequence(entityIds.map { entityId => logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) ClusterSharding(systemPerDc(dc)) - .entityRefFor(entityTypeKey, entityId) + .entityRefFor(LWWHelloWorld.EntityType, entityId) .ask(LWWHelloWorld.SetGreeting(s"hello $greetingNo from ${dc.id}", _)) }) } @@ -295,7 +294,7 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) withClue(s"for entity id $entityId") { testKitsPerDc.values.map { testKit => val entityRef = ClusterSharding(testKit.system) - .entityRefFor(entityTypeKey, entityId) + .entityRefFor(LWWHelloWorld.EntityType, entityId) entityRef .ask(LWWHelloWorld.Get.apply) diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala index 33c76d08d..92133ebdd 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala @@ -32,24 +32,36 @@ class ReplicationSettingsSpec extends AnyWordSpec with Matchers { # which of the replicas this node belongs to, should be the same # across the nodes of each replica Akka cluster. self-replica-id = dca + # Pick it up from an environment variable to re-use the same config # without changes across replicas self-replica-id = ${?SELF_REPLICA} + # max number of parallel in-flight (sent over sharding) entity updates # per consumer/projection parallel-updates = 8 + # Fail the replication stream (and restart with backoff) if completing # the write of a replicated event reaching the cluster takes more time # than this. entity-event-replication-timeout = 10s + + # When enabled all events will be transferred from all replicas, otherwise + # only events from the origin replica will be transferred from the origin + # replica. When each replica is connected to each other replica it's most + # efficient to disable indirect replication. + indirect-replication = on + replicas: [ { # Unique identifier of the replica/datacenter, is stored in the events # and cannot be changed after events have been persisted. replica-id = "dca" + # Number of replication streams/projections to start to consume events # from this replica number-of-consumers = 4 + # Akka gRPC client config block for how to reach this replica # from the other replicas, note that binding the server/publishing # endpoint of each replica is done separately, in code. @@ -94,6 +106,7 @@ class ReplicationSettingsSpec extends AnyWordSpec with Matchers { settings.otherReplicas.map(_.replicaId.id) should ===(Set("dcb", "dcc")) settings.otherReplicas.forall(_.numberOfConsumers === 4) should ===(true) settings.parallelUpdates should ===(8) + settings.indirectReplication should ===(true) val replicaB = settings.otherReplicas.find(_.replicaId.id == "dcb").get replicaB.grpcClientSettings.defaultPort should ===(8444) diff --git a/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/ReplicationSettings.excludes b/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/ReplicationSettings.excludes new file mode 100644 index 000000000..65f9246cd --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/ReplicationSettings.excludes @@ -0,0 +1,2 @@ +# added indirectReplication to private constructor +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.replication.scaladsl.ReplicationSettings.this") diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index d6f855ec5..354c561d0 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -86,8 +86,10 @@ private[akka] object ReplicationImpl { val onlyLocalOriginTransformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper(envelope => envelope.eventMetadata match { case Some(meta: ReplicatedEventMetadata) => - if (meta.originReplica == settings.selfReplicaId) Future.successful(envelope.eventOption) - else filteredEvent // Optimization: was replicated to this DC, don't pass the payload across the wire + if (settings.indirectReplication || meta.originReplica == settings.selfReplicaId) + Future.successful(envelope.eventOption) + else + filteredEvent // Optimization: was replicated to this replica, don't pass the payload across the wire case _ => throw new IllegalArgumentException( s"Got an event without replication metadata, not supported (pid: ${envelope.persistenceId}, seq_nr: ${envelope.sequenceNr})") @@ -171,14 +173,19 @@ private[akka] object ReplicationImpl { case (envelope, _) => if (!envelope.filtered) { envelope.eventMetadata match { - case Some(replicatedEventMetadata: ReplicatedEventMetadata) => - // skipping events originating from other replicas is handled by filtering but for good measure - if (replicatedEventMetadata.originReplica != remoteReplica.replicaId) - throw new IllegalArgumentException( - "Expected replicated event from replica " + - s"[${remoteReplica.replicaId}] but was [${replicatedEventMetadata.originReplica}]. " + - "Verify your replication configuration, such as self-replica-id.") + case Some(replicatedEventMetadata: ReplicatedEventMetadata) + if replicatedEventMetadata.originReplica == settings.selfReplicaId => + // skipping events originating from self replica (break cycle) + if (log.isTraceEnabled) + log.traceN( + "[{}] ignoring event from replica [{}] with self origin (pid [{}], seq_nr [{}])", + projectionKey, + remoteReplica.replicaId, + envelope.persistenceId, + envelope.sequenceNr) + Future.successful(Done) + case Some(replicatedEventMetadata: ReplicatedEventMetadata) => val replicationId = ReplicationId.fromString(envelope.persistenceId) val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId) val entityRef = diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala index e1af7eade..5c6c403f0 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala @@ -81,7 +81,8 @@ object ReplicationSettings { parallelUpdates = parallelUpdates, projectionProvider = replicationProjectionProvider, None, - identity) + identity, + indirectReplication = false) } /** @@ -97,6 +98,8 @@ object ReplicationSettings { // Note: any changes here needs to be reflected in Java ReplicationSettings config loading val selfReplicaId = ReplicaId(config.getString("self-replica-id")) + val indirectReplication = + if (config.hasPath("indirect-replication")) config.getBoolean("indirect-replication") else false val grpcClientFallBack = system.settings.config.getConfig("""akka.grpc.client."*"""") val allReplicas: Set[Replica] = config .getConfigList("replicas") @@ -131,7 +134,8 @@ object ReplicationSettings { parallelUpdates = config.getInt("parallel-updates"), projectionProvider = replicationProjectionProvider, None, - identity) + identity, + indirectReplication) } } @@ -150,7 +154,8 @@ final class ReplicationSettings[Command] private ( val parallelUpdates: Int, val projectionProvider: ReplicationProjectionProvider, val eventProducerInterceptor: Option[EventProducerInterceptor], - val configureEntity: Entity[Command, ShardingEnvelope[Command]] => Entity[Command, ShardingEnvelope[Command]]) { + val configureEntity: Entity[Command, ShardingEnvelope[Command]] => Entity[Command, ShardingEnvelope[Command]], + val indirectReplication: Boolean) { require( !otherReplicas.exists(_.replicaId == selfReplicaId), @@ -205,6 +210,14 @@ final class ReplicationSettings[Command] private ( : ReplicationSettings[Command] = copy(configureEntity = configure) + /** + * When enabled all events will be transferred from all replicas, otherwise only events from the origin + * replica will be transferred from the origin replica. + * When each replica is connected to each other replica it's most efficient to disable indirect replication. + */ + def withIndirectReplication(enabled: Boolean): ReplicationSettings[Command] = + copy(indirectReplication = enabled) + private def copy( selfReplicaId: ReplicaId = selfReplicaId, entityTypeKey: EntityTypeKey[Command] = entityTypeKey, @@ -216,7 +229,8 @@ final class ReplicationSettings[Command] private ( projectionProvider: ReplicationProjectionProvider = projectionProvider, producerInterceptor: Option[EventProducerInterceptor] = eventProducerInterceptor, configureEntity: Entity[Command, ShardingEnvelope[Command]] => Entity[Command, ShardingEnvelope[Command]] = - configureEntity) = + configureEntity, + indirectReplication: Boolean = indirectReplication) = new ReplicationSettings[Command]( selfReplicaId, entityTypeKey, @@ -227,7 +241,8 @@ final class ReplicationSettings[Command] private ( parallelUpdates, projectionProvider, producerInterceptor, - configureEntity) + configureEntity, + indirectReplication) override def toString = s"ReplicationSettings($selfReplicaId, $entityTypeKey, $streamId, $otherReplicas)"