Skip to content

Commit

Permalink
feat: Allow indirect replication
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 29, 2023
1 parent 7dc0bd9 commit 41147c8
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 29 deletions.
4 changes: 2 additions & 2 deletions akka-projection-grpc-tests/src/it/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
</logger>

<!-- One of the tests depend on these, don't change -->
<logger name="akka.projection.grpc" level="TRACE" />
<logger name="akka.projection.grpc" level="DEBUG" />
<logger name="akka.projection.r2dbc" level="DEBUG" />
<logger name="akka.persistence.r2dbc" level="DEBUG" />
<logger name="akka.persistence.typed" level="TRACE" />
<logger name="akka.persistence.typed" level="DEBUG" />

<!-- Silence some other stuff -->
<logger name="akka.actor.typed.pubsub" level="INFO" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
/*
* Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.grpc.replication

import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.cluster.MemberStatus
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.grpc.GrpcClientSettings
import akka.http.scaladsl.Http
import akka.persistence.typed.ReplicaId
import akka.projection.grpc.TestContainerConf
import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.replication.scaladsl.Replica
import akka.projection.grpc.replication.scaladsl.Replication
import akka.projection.grpc.replication.scaladsl.ReplicationSettings
import akka.projection.r2dbc.R2dbcProjectionSettings
import akka.projection.r2dbc.scaladsl.R2dbcReplication
import akka.testkit.SocketUtil
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

object IndirectReplicationIntegrationSpec {

private def config(dc: ReplicaId): Config =
ConfigFactory.parseString(s"""
akka.actor.provider = cluster
akka.actor {
serialization-bindings {
"${classOf[ReplicationIntegrationSpec].getName}$$LWWHelloWorld$$Event" = jackson-json
}
}
akka.http.server.preview.enable-http2 = on
akka.persistence.r2dbc {
query {
refresh-interval = 500 millis
# reducing this to have quicker test, triggers backtracking earlier
backtracking.behind-current-time = 3 seconds
}
}
akka.projection.grpc {
producer {
query-plugin-id = "akka.persistence.r2dbc.query"
}
}
akka.projection.r2dbc.offset-store {
timestamp-offset-table = "akka_projection_timestamp_offset_store_${dc.id}"
}
akka.remote.artery.canonical.host = "127.0.0.1"
akka.remote.artery.canonical.port = 0
akka.actor.testkit.typed {
filter-leeway = 10s
system-shutdown-default = 30s
}
""")

private val DCA = ReplicaId("DCA")
private val DCB = ReplicaId("DCB")
private val DCC = ReplicaId("DCC")

}

class IndirectReplicationIntegrationSpec(testContainerConf: TestContainerConf)
extends ScalaTestWithActorTestKit(
akka.actor
.ActorSystem(
"IndirectReplicationIntegrationSpecA",
IndirectReplicationIntegrationSpec
.config(IndirectReplicationIntegrationSpec.DCA)
.withFallback(testContainerConf.config))
.toTyped)
with AnyWordSpecLike
with TestDbLifecycle
with BeforeAndAfterAll
with LogCapturing {
import IndirectReplicationIntegrationSpec._
import ReplicationIntegrationSpec.LWWHelloWorld
implicit val ec: ExecutionContext = system.executionContext

def this() = this(new TestContainerConf)

private val logger = LoggerFactory.getLogger(classOf[IndirectReplicationIntegrationSpec])
override def typedSystem: ActorSystem[_] = testKit.system

private val systems = Seq[ActorSystem[_]](
typedSystem,
akka.actor
.ActorSystem(
"IndirectReplicationIntegrationSpecB",
IndirectReplicationIntegrationSpec.config(DCB).withFallback(testContainerConf.config))
.toTyped,
akka.actor
.ActorSystem(
"IndirectReplicationIntegrationSpecC",
IndirectReplicationIntegrationSpec.config(DCC).withFallback(testContainerConf.config))
.toTyped)

private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, "127.0.0.1").map(_.getPort)
private val allDcsAndPorts = Seq(DCA, DCB, DCC).zip(grpcPorts)
private val allReplicas = allDcsAndPorts.map {
case (id, port) =>
Replica(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", port).withTls(false))
}.toSet

private val testKitsPerDc = Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), DCC -> ActorTestKit(systems(2)))
private val systemPerDc = Map(DCA -> system, DCB -> systems(1), DCC -> systems(2))
private val entityIds = Set("one", "two", "three")

override protected def beforeAll(): Unit = {
super.beforeAll()
// We can share the journal to save a bit of work, because the persistence id contains
// the dc so is unique (this is ofc completely synthetic, the whole point of replication
// over grpc is to replicate between different dcs/regions with completely separate databases).
// The offset tables need to be separate though to not get conflicts on projection names
systemPerDc.values.foreach { system =>
val r2dbcProjectionSettings = R2dbcProjectionSettings(system)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(
_.createStatement(s"delete from ${r2dbcProjectionSettings.timestampOffsetTableWithSchema}")),
10.seconds)

}
}

def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = {
val otherReplicas = selfReplicaId match {
case DCA => allReplicas.filter(r => r.replicaId == DCB || r.replicaId == DCC)
case DCB => allReplicas.filter(_.replicaId == DCA)
case DCC => allReplicas.filter(_.replicaId == DCA)
case other => throw new IllegalArgumentException(other.id)
}

val settings = ReplicationSettings[LWWHelloWorld.Command](
LWWHelloWorld.EntityType.name,
selfReplicaId,
EventProducerSettings(replicaSystem),
otherReplicas,
10.seconds,
8,
R2dbcReplication())
.withIndirectReplication(true)
Replication.grpcReplication(settings)(LWWHelloWorld.apply)(replicaSystem)
}

def assertGreeting(entityId: String, expected: String): Unit = {
testKitsPerDc.values.foreach { testKit =>
withClue(s"on ${testKit.system.name}") {
val probe = testKit.createTestProbe()
withClue(s"for entity id $entityId") {
val entityRef = ClusterSharding(testKit.system)
.entityRefFor(LWWHelloWorld.EntityType, entityId)

probe.awaitAssert({
entityRef
.ask(LWWHelloWorld.Get.apply)
.futureValue should ===(expected)
}, 10.seconds)
}
}
}
}

"Replication over gRPC" should {
"form three one node clusters" in {
testKitsPerDc.values.foreach { testKit =>
val cluster = Cluster(testKit.system)
cluster.manager ! Join(cluster.selfMember.address)
testKit.createTestProbe().awaitAssert {
cluster.selfMember.status should ===(MemberStatus.Up)
}
}
}

"start three replicas" in {
val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map {
case (replica, index) =>
val system = systems(index)
logger
.infoN(
"Starting replica [{}], system [{}] on port [{}]",
replica.replicaId,
system.name,
replica.grpcClientSettings.defaultPort)
val started = startReplica(system, replica.replicaId)
val grpcPort = grpcPorts(index)

// start producer server
Http(system)
.newServerAt("127.0.0.1", grpcPort)
.bind(started.createSingleServiceHandler())
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
.map(_ => replica.replicaId -> started)
})

replicasStarted.futureValue
logger.info("All three replication/producer services bound")
}

"replicate writes from one dc to the other two" in {
systemPerDc.keys.foreach { dc =>
withClue(s"from ${dc.id}") {
Future
.sequence(entityIds.map { entityId =>
logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id)
ClusterSharding(systemPerDc(dc))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${dc.id}", _))
})
.futureValue

testKitsPerDc.values.foreach { testKit =>
withClue(s"on ${testKit.system.name}") {
val probe = testKit.createTestProbe()

entityIds.foreach { entityId =>
withClue(s"for entity id $entityId") {
val entityRef = ClusterSharding(testKit.system)
.entityRefFor(LWWHelloWorld.EntityType, entityId)

probe.awaitAssert({
entityRef
.ask(LWWHelloWorld.Get.apply)
.futureValue should ===(s"hello 1 from ${dc.id}")
}, 10.seconds)
}
}
}
}
}
}
}

"replicate concurrent writes to the other DCs" in (2 to 4).foreach { greetingNo =>
withClue(s"Greeting $greetingNo") {
Future
.sequence(systemPerDc.keys.map { dc =>
withClue(s"from ${dc.id}") {
Future.sequence(entityIds.map { entityId =>
logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id)
ClusterSharding(systemPerDc(dc))
.entityRefFor(LWWHelloWorld.EntityType, entityId)
.ask(LWWHelloWorld.SetGreeting(s"hello $greetingNo from ${dc.id}", _))
})
}
})
.futureValue // all three updated in roughly parallel

// All 3 should eventually arrive at the same value
testKit
.createTestProbe()
.awaitAssert(
{
entityIds.foreach { entityId =>
withClue(s"for entity id $entityId") {
testKitsPerDc.values.map { testKit =>
val entityRef = ClusterSharding(testKit.system)
.entityRefFor(LWWHelloWorld.EntityType, entityId)

entityRef
.ask(LWWHelloWorld.Get.apply)
.futureValue
}.toSet should have size (1)
}
}
},
20.seconds)
}
}
}

protected override def afterAll(): Unit = {
logger.info("Shutting down all three DCs")
systems.foreach(_.terminate()) // speed up termination by terminating all at the once
// and then make sure they are completely shutdown
systems.foreach { system =>
ActorTestKit.shutdown(system)
}
super.afterAll()
}
}
Loading

0 comments on commit 41147c8

Please sign in to comment.