diff --git a/akka-projection-grpc-tests/src/it/resources/db/default-init.sql b/akka-projection-grpc-tests/src/it/resources/db/default-init.sql index 2351d3f77..38564f87e 100644 --- a/akka-projection-grpc-tests/src/it/resources/db/default-init.sql +++ b/akka-projection-grpc-tests/src/it/resources/db/default-init.sql @@ -136,4 +136,18 @@ CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCC ( -- the consumer lag is timestamp_consumed - timestamp_offset timestamp_consumed timestamp with time zone NOT NULL, PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) -); \ No newline at end of file +); + +CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store_DCD ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + slice INT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + -- timestamp_offset is the db_timestamp of the original event + timestamp_offset timestamp with time zone NOT NULL, + -- timestamp_consumed is when the offset was stored + -- the consumer lag is timestamp_consumed - timestamp_offset + timestamp_consumed timestamp with time zone NOT NULL, + PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) +); diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala new file mode 100644 index 000000000..3f19b5e33 --- /dev/null +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/EdgeReplicationIntegrationSpec.scala @@ -0,0 +1,395 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ + +package akka.projection.grpc.replication + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps +import akka.cluster.MemberStatus +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.grpc.GrpcClientSettings +import akka.http.scaladsl.Http +import akka.persistence.typed.ReplicaId +import akka.projection.grpc.TestContainerConf +import akka.projection.grpc.TestData +import akka.projection.grpc.TestDbLifecycle +import akka.projection.grpc.consumer.ConsumerFilter +import akka.projection.grpc.consumer.ConsumerFilter.IncludeTags +import akka.projection.grpc.consumer.ConsumerFilter.UpdateFilter +import akka.projection.grpc.producer.EventProducerSettings +import akka.projection.grpc.replication.scaladsl.Replica +import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import akka.projection.r2dbc.R2dbcProjectionSettings +import akka.projection.r2dbc.scaladsl.R2dbcReplication +import akka.testkit.SocketUtil +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +object EdgeReplicationIntegrationSpec { + + private def config(dc: ReplicaId): Config = + ConfigFactory.parseString(s""" + akka.actor.provider = cluster + akka.actor { + serialization-bindings { + "${classOf[ReplicationIntegrationSpec].getName}$$LWWHelloWorld$$Event" = jackson-json + } + } + akka.http.server.preview.enable-http2 = on + akka.persistence.r2dbc { + query { + refresh-interval = 500 millis + # reducing this to have quicker test, triggers backtracking earlier + backtracking.behind-current-time = 3 seconds + } + } + akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } + } + akka.projection.r2dbc.offset-store { + timestamp-offset-table = "akka_projection_timestamp_offset_store_${dc.id}" + } + akka.remote.artery.canonical.host = "127.0.0.1" + akka.remote.artery.canonical.port = 0 + akka.actor.testkit.typed { + filter-leeway = 10s + system-shutdown-default = 30s + } + """) + + private val CloudReplicaA = ReplicaId("DCA") + private val CloudReplicaB = ReplicaId("DCB") + private val EdgeReplicaC = ReplicaId("DCC") + private val EdgeReplicaD = ReplicaId("DCD") + +} + +class EdgeReplicationIntegrationSpec(testContainerConf: TestContainerConf) + extends ScalaTestWithActorTestKit( + akka.actor + .ActorSystem( + "EdgeReplicationIntegrationSpecA", + EdgeReplicationIntegrationSpec + .config(EdgeReplicationIntegrationSpec.CloudReplicaA) + .withFallback(testContainerConf.config)) + .toTyped) + with AnyWordSpecLike + with TestDbLifecycle + with BeforeAndAfterAll + with LogCapturing + with TestData { + import EdgeReplicationIntegrationSpec._ + import ReplicationIntegrationSpec.LWWHelloWorld + implicit val ec: ExecutionContext = system.executionContext + + def this() = this(new TestContainerConf) + + private val logger = LoggerFactory.getLogger(classOf[EdgeReplicationIntegrationSpec]) + override def typedSystem: ActorSystem[_] = testKit.system + + private val systems = Seq[ActorSystem[_]]( + typedSystem, + akka.actor + .ActorSystem( + "EdgeReplicationIntegrationSpecB", + EdgeReplicationIntegrationSpec.config(CloudReplicaB).withFallback(testContainerConf.config)) + .toTyped, + akka.actor + .ActorSystem( + "EdgeReplicationIntegrationSpecC", + EdgeReplicationIntegrationSpec.config(EdgeReplicaC).withFallback(testContainerConf.config)) + .toTyped, + akka.actor + .ActorSystem( + "EdgeReplicationIntegrationSpecD", + EdgeReplicationIntegrationSpec.config(EdgeReplicaD).withFallback(testContainerConf.config)) + .toTyped) + + private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, "127.0.0.1").map(_.getPort) + private val allDcsAndPorts = Seq(CloudReplicaA, CloudReplicaB, EdgeReplicaC, EdgeReplicaD).zip(grpcPorts) + private val allReplicas = allDcsAndPorts.map { + case (id, port) => + Replica(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", port).withTls(false)) + }.toSet + + private val testKitsPerDc = Map( + CloudReplicaA -> testKit, + CloudReplicaB -> ActorTestKit(systems(1)), + EdgeReplicaC -> ActorTestKit(systems(2)), + EdgeReplicaD -> ActorTestKit(systems(3))) + private val systemPerDc = + Map(CloudReplicaA -> system, CloudReplicaB -> systems(1), EdgeReplicaC -> systems(2), EdgeReplicaD -> systems(3)) + private val entityIds = Set( + nextPid(LWWHelloWorld.EntityType.name).entityId, + nextPid(LWWHelloWorld.EntityType.name).entityId, + nextPid(LWWHelloWorld.EntityType.name).entityId) + + override protected def beforeAll(): Unit = { + super.beforeAll() + // We can share the journal to save a bit of work, because the persistence id contains + // the dc so is unique (this is ofc completely synthetic, the whole point of replication + // over grpc is to replicate between different dcs/regions with completely separate databases). + // The offset tables need to be separate though to not get conflicts on projection names + systemPerDc.values.foreach { system => + val r2dbcProjectionSettings = R2dbcProjectionSettings(system) + Await.result( + r2dbcExecutor.updateOne("beforeAll delete")( + _.createStatement(s"delete from ${r2dbcProjectionSettings.timestampOffsetTableWithSchema}")), + 10.seconds) + + } + } + + def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = { + val otherReplicas = selfReplicaId match { + case CloudReplicaA => allReplicas.filterNot(_.replicaId == CloudReplicaA) + case CloudReplicaB => allReplicas.filterNot(_.replicaId == CloudReplicaB) + case EdgeReplicaC => allReplicas.filter(_.replicaId == CloudReplicaA) + case EdgeReplicaD => allReplicas.filter(_.replicaId == CloudReplicaA) + case other => throw new IllegalArgumentException(other.id) + } + + val settings = ReplicationSettings[LWWHelloWorld.Command]( + LWWHelloWorld.EntityType.name, + selfReplicaId, + EventProducerSettings(replicaSystem), + otherReplicas, + 10.seconds, + 8, + R2dbcReplication()) + Replication.grpcReplication(settings)(LWWHelloWorld.apply)(replicaSystem) + } + + def assertGreeting(entityId: String, expected: String): Unit = { + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(expected) + }, 10.seconds) + } + } + } + } + + "Replication over gRPC" should { + "form one node clusters" in { + testKitsPerDc.values.foreach { testKit => + val cluster = Cluster(testKit.system) + cluster.manager ! Join(cluster.selfMember.address) + testKit.createTestProbe().awaitAssert { + cluster.selfMember.status should ===(MemberStatus.Up) + } + } + } + + "start replicas" in { + val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map { + case (replica, index) => + val system = systems(index) + logger + .infoN( + "Starting replica [{}], system [{}] on port [{}]", + replica.replicaId, + system.name, + replica.grpcClientSettings.defaultPort) + val started = startReplica(system, replica.replicaId) + val grpcPort = grpcPorts(index) + + // start producer server + Http(system) + .newServerAt("127.0.0.1", grpcPort) + .bind(started.createSingleServiceHandler()) + .map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext) + .map(_ => replica.replicaId -> started) + }) + + replicasStarted.futureValue + logger.info("All three replication/producer services bound") + } + + "replicate indirectly" in { + val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId + + // Edge replicas are only connected to CloudReplicaA + ClusterSharding(systemPerDc(CloudReplicaB)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello from B", _)) + .futureValue + assertGreeting(entityId, "Hello from B") + } + + "replicate writes from one dc to the other DCs" in { + systemPerDc.keys.foreach { dc => + withClue(s"from ${dc.id}") { + Future + .sequence(entityIds.map { entityId => + logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) + ClusterSharding(systemPerDc(dc)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${dc.id}", _)) + }) + .futureValue + + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue should ===(s"hello 1 from ${dc.id}") + }, 10.seconds) + } + } + } + } + } + } + } + + "replicate concurrent writes to the other DCs" in (2 to 4).foreach { greetingNo => + withClue(s"Greeting $greetingNo") { + Future + .sequence(systemPerDc.keys.map { dc => + withClue(s"from ${dc.id}") { + Future.sequence(entityIds.map { entityId => + logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) + ClusterSharding(systemPerDc(dc)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting(s"hello $greetingNo from ${dc.id}", _)) + }) + } + }) + .futureValue // all three updated in roughly parallel + + // All 3 should eventually arrive at the same value + testKit + .createTestProbe() + .awaitAssert( + { + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + testKitsPerDc.values.map { testKit => + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + + entityRef + .ask(LWWHelloWorld.Get.apply) + .futureValue + }.toSet should have size (1) + } + } + }, + 20.seconds) + } + } + } + + "use consumer filter on tag" in { + val entityId = nextPid(LWWHelloWorld.EntityType.name).entityId + + ConsumerFilter(systemPerDc(EdgeReplicaC)).ref ! UpdateFilter( + LWWHelloWorld.EntityType.name, + List(ConsumerFilter.excludeAll, IncludeTags(Set("tag-C")))) + ConsumerFilter(systemPerDc(EdgeReplicaD)).ref ! UpdateFilter( + LWWHelloWorld.EntityType.name, + List(ConsumerFilter.excludeAll, IncludeTags(Set("tag-D")))) + + // let the filter propagate to producer + Thread.sleep(1000) + + ClusterSharding(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetTag("tag-C", _)) + .futureValue + + ClusterSharding(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello C", _)) + .futureValue + + eventually { + ClusterSharding(systemPerDc(EdgeReplicaC)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello C" + } + + // but not updated in D + ClusterSharding(systemPerDc(EdgeReplicaD)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello world" + + // change tag + ClusterSharding(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetTag("tag-D", _)) + .futureValue + + // previous greeting should be replicated + eventually { + ClusterSharding(systemPerDc(EdgeReplicaD)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello C" + } + + ClusterSharding(systemPerDc(CloudReplicaA)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.SetGreeting("Hello D", _)) + .futureValue + eventually { + ClusterSharding(systemPerDc(EdgeReplicaD)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello D" + } + + // but not updated in C + ClusterSharding(systemPerDc(EdgeReplicaC)) + .entityRefFor(LWWHelloWorld.EntityType, entityId) + .ask(LWWHelloWorld.Get(_)) + .futureValue shouldBe "Hello C" + } + + protected override def afterAll(): Unit = { + logger.info("Shutting down all three DCs") + systems.foreach(_.terminate()) // speed up termination by terminating all at the once + // and then make sure they are completely shutdown + systems.foreach { system => + ActorTestKit.shutdown(system) + } + super.afterAll() + } +} diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala index b546d30b5..5a652bd47 100644 --- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/replication/ReplicationIntegrationSpec.scala @@ -25,7 +25,6 @@ import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.projection.grpc.TestContainerConf import akka.projection.grpc.TestDbLifecycle import akka.projection.grpc.producer.EventProducerSettings -import akka.projection.grpc.replication import akka.projection.grpc.replication.scaladsl.Replica import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors import akka.projection.grpc.replication.scaladsl.Replication @@ -38,12 +37,13 @@ import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory - import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.DurationInt +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + object ReplicationIntegrationSpec { private def config(dc: ReplicaId): Config = @@ -51,7 +51,7 @@ object ReplicationIntegrationSpec { akka.actor.provider = cluster akka.actor { serialization-bindings { - "${classOf[replication.ReplicationIntegrationSpec].getName}$$LWWHelloWorld$$Event" = jackson-json + "${classOf[ReplicationIntegrationSpec].getName}$$LWWHelloWorld$$Event" = jackson-json } } akka.http.server.preview.enable-http2 = on @@ -84,28 +84,30 @@ object ReplicationIntegrationSpec { object LWWHelloWorld { - sealed trait Command - - case class Get(replyTo: ActorRef[String]) extends Command + val EntityType: EntityTypeKey[Command] = EntityTypeKey[Command]("hello-world") - case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command + sealed trait Command + final case class Get(replyTo: ActorRef[String]) extends Command + final case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command + final case class SetTag(tag: String, replyTo: ActorRef[Done]) extends Command sealed trait Event - - case class GreetingChanged(greeting: String, timestamp: LwwTime) extends Event + final case class GreetingChanged(greeting: String, timestamp: LwwTime) extends Event + final case class TagChanged(tag: String, timestamp: LwwTime) extends Event object State { - val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId(""))) + val initial = + State("Hello world", LwwTime(Long.MinValue, ReplicaId("")), "", LwwTime(Long.MinValue, ReplicaId(""))) } - case class State(greeting: String, timestamp: LwwTime) + case class State(greeting: String, greetingTimestamp: LwwTime, tag: String, tagTimestamp: LwwTime) def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) = replicatedBehaviors.setup { replicationContext => EventSourcedBehavior[Command, Event, State]( replicationContext.persistenceId, State.initial, { - case (State(greeting, _), Get(replyTo)) => + case (State(greeting, _, _, _), Get(replyTo)) => replyTo ! greeting Effect.none case (state, SetGreeting(greeting, replyTo)) => @@ -113,14 +115,30 @@ object ReplicationIntegrationSpec { .persist( GreetingChanged( greeting, - state.timestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) + state.greetingTimestamp + .increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) + .thenRun((_: State) => replyTo ! Done) + case (state, SetTag(tag, replyTo)) => + Effect + .persist( + TagChanged( + tag, + state.greetingTimestamp + .increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))) .thenRun((_: State) => replyTo ! Done) }, { case (currentState, GreetingChanged(newGreeting, newTimestamp)) => - if (newTimestamp.isAfter(currentState.timestamp)) - State(newGreeting, newTimestamp) + if (newTimestamp.isAfter(currentState.greetingTimestamp)) + currentState.copy(newGreeting, newTimestamp) + else currentState + case (currentState, TagChanged(newTag, newTimestamp)) => + if (newTimestamp.isAfter(currentState.tagTimestamp)) + currentState.copy(tag = newTag, tagTimestamp = newTimestamp) else currentState }) + .withTaggerForState { + case (state, _) => if (state.tag == "") Set.empty else Set(state.tag) + } } } } @@ -168,7 +186,6 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) private val testKitsPerDc = Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), DCC -> ActorTestKit(systems(2))) private val systemPerDc = Map(DCA -> system, DCB -> systems(1), DCC -> systems(2)) - private var replicatedEventSourcingOverGrpcPerDc: Map[ReplicaId, Replication[LWWHelloWorld.Command]] = Map.empty private val entityIds = Set("one", "two", "three") override protected def beforeAll(): Unit = { @@ -189,7 +206,7 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[LWWHelloWorld.Command] = { val settings = ReplicationSettings[LWWHelloWorld.Command]( - "hello-world", + LWWHelloWorld.EntityType.name, selfReplicaId, EventProducerSettings(replicaSystem), allReplicas, @@ -231,31 +248,30 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) .map(_ => replica.replicaId -> started) }) - replicatedEventSourcingOverGrpcPerDc = replicasStarted.futureValue.toMap + replicasStarted.futureValue logger.info("All three replication/producer services bound") } "replicate writes from one dc to the other two" in { - val entityTypeKey = replicatedEventSourcingOverGrpcPerDc.values.head.entityTypeKey systemPerDc.keys.foreach { dc => withClue(s"from ${dc.id}") { Future .sequence(entityIds.map { entityId => logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) ClusterSharding(systemPerDc(dc)) - .entityRefFor(entityTypeKey, entityId) + .entityRefFor(LWWHelloWorld.EntityType, entityId) .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${dc.id}", _)) }) .futureValue testKitsPerDc.values.foreach { testKit => - withClue(s"on ${system.name}") { + withClue(s"on ${testKit.system.name}") { val probe = testKit.createTestProbe() entityIds.foreach { entityId => withClue(s"for entity id $entityId") { val entityRef = ClusterSharding(testKit.system) - .entityRefFor(entityTypeKey, entityId) + .entityRefFor(LWWHelloWorld.EntityType, entityId) probe.awaitAssert({ entityRef @@ -272,14 +288,13 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) "replicate concurrent writes to the other DCs" in (2 to 4).foreach { greetingNo => withClue(s"Greeting $greetingNo") { - val entityTypeKey = replicatedEventSourcingOverGrpcPerDc.values.head.entityTypeKey Future .sequence(systemPerDc.keys.map { dc => withClue(s"from ${dc.id}") { Future.sequence(entityIds.map { entityId => logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) ClusterSharding(systemPerDc(dc)) - .entityRefFor(entityTypeKey, entityId) + .entityRefFor(LWWHelloWorld.EntityType, entityId) .ask(LWWHelloWorld.SetGreeting(s"hello $greetingNo from ${dc.id}", _)) }) } @@ -295,7 +310,7 @@ class ReplicationIntegrationSpec(testContainerConf: TestContainerConf) withClue(s"for entity id $entityId") { testKitsPerDc.values.map { testKit => val entityRef = ClusterSharding(testKit.system) - .entityRefFor(entityTypeKey, entityId) + .entityRefFor(LWWHelloWorld.EntityType, entityId) entityRef .ask(LWWHelloWorld.Get.apply) diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala index e1170c9a1..bf299e136 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala @@ -29,17 +29,22 @@ import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQu import akka.persistence.query.typed.scaladsl.EventsBySliceQuery import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery import akka.persistence.typed.PersistenceId +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.internal.ReplicatedEventMetadata +import akka.persistence.typed.internal.VersionVector import akka.projection.grpc.internal.proto.EventTimestampRequest import akka.projection.grpc.internal.proto.InitReq import akka.projection.grpc.internal.proto.LoadEventRequest import akka.projection.grpc.internal.proto.PersistenceIdSeqNr import akka.projection.grpc.internal.proto.ReplayReq +import akka.projection.grpc.internal.proto.ReplicaInfo import akka.projection.grpc.internal.proto.StreamIn import akka.projection.grpc.internal.proto.StreamOut import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation import akka.projection.grpc.producer.scaladsl.EventProducerInterceptor +import akka.projection.grpc.replication.internal.EventOriginFilter import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source @@ -147,10 +152,11 @@ object EventProducerServiceSpec { pid: PersistenceId, seqNr: Long, evt: String, - tags: Set[String] = Set.empty): EventEnvelope[String] = { + tags: Set[String] = Set.empty, + source: String = ""): EventEnvelope[String] = { val now = Instant.now() val slice = math.abs(pid.hashCode % 1024) - EventEnvelope( + val env = EventEnvelope( TimestampOffset(Instant.now, Map(pid.id -> seqNr)), pid.id, seqNr, @@ -161,7 +167,25 @@ object EventProducerServiceSpec { filtered = false, source = "", tags) + + if (source == "BT") + env.withEventOption(None) + else + env } + + private def createEnvelope(pid: PersistenceId, seqNr: Long, evt: String, origin: String): EventEnvelope[String] = + createEnvelope(pid, seqNr, evt) + .withMetadata(ReplicatedEventMetadata(ReplicaId(origin), seqNr, VersionVector.empty, false)) + + private def createBacktrackingEnvelope( + pid: PersistenceId, + seqNr: Long, + evt: String, + origin: String): EventEnvelope[String] = + createEnvelope(pid, seqNr, evt, source = "BT") + .withMetadata(ReplicatedEventMetadata(ReplicaId(origin), seqNr, VersionVector.empty, false)) + } class EventProducerServiceSpec @@ -195,9 +219,11 @@ class EventProducerServiceSpec val streamId4 = "stream_id_" + entityType4 val entityType5 = nextEntityType() + "_snap" val streamId5 = "stream_id_" + entityType5 + val entityType6 = nextEntityType() + val streamId6 = "stream_id_" + entityType6 private val eventsBySlicesQueries = - Map(streamId1 -> query, streamId2 -> query, streamId3 -> query) + Map(streamId1 -> query, streamId2 -> query, streamId3 -> query, streamId6 -> query) private val eventsBySlicesStartingFromSnapshotsQueries = Map(streamId4 -> query, streamId5 -> query) private val currentEventsByPersistenceIdQueries = @@ -222,7 +248,9 @@ class EventProducerServiceSpec evt.replace("-snap", "") else evt - }) + }, + EventProducerSource(entityType6, streamId6, transformation, settings) + .withReplicatedEventOriginFilter(new EventOriginFilter(ReplicaId("replica1")))) private val eventProducerService = new EventProducerServiceImpl( @@ -491,6 +519,59 @@ class EventProducerServiceSpec // transformSnapshot removes the "-snap" protoAnySerialization.deserialize(out3.getEvent.payload.get) shouldBe "E-3" } + + "filter based on event origin" in { + val replicaInfo = ReplicaInfo("replica2", List("replica1", "replica3")) + val initReq = InitReq(streamId6, 0, 1023, offset = None, replicaInfo = Some(replicaInfo)) + val streamIn = Source + .single(StreamIn(StreamIn.Message.Init(initReq))) + .concat(Source.maybe) + + val probe = runEventsBySlices(streamIn) + + probe.request(100) + val testPublisher = + query.testPublisher(entityType6).futureValue + + val pid = nextPid(entityType6) + // emitted because replica1 is the producer self replica + val env1 = createEnvelope(pid, 1L, "e-1", "replica1") + testPublisher.sendNext(env1) + // will be filtered because replica2 is the consumer self replica + val env2 = createEnvelope(pid, 2L, "e-2", "replica2") + testPublisher.sendNext(env2) + // will be filtered because replica3 is included in other replicas + val env3 = createEnvelope(pid, 3L, "e-3", "replica3") + testPublisher.sendNext(env3) + // emitted because replica4 is not included in other replicas + val env4 = createEnvelope(pid, 4L, "e-4", "replica4") + testPublisher.sendNext(env4) + // emitted because backtracking + val env5 = createBacktrackingEnvelope(pid, 4L, "e-4", "replica2") + testPublisher.sendNext(env5) + + val out1 = probe.expectNext() + out1.message.isEvent shouldBe true + out1.getEvent.seqNr shouldBe env1.sequenceNr + + val out2 = probe.expectNext() + out2.message.isFilteredEvent shouldBe true + out2.getFilteredEvent.seqNr shouldBe env2.sequenceNr + + val out3 = probe.expectNext() + out3.message.isFilteredEvent shouldBe true + out3.getFilteredEvent.seqNr shouldBe env3.sequenceNr + + val out4 = probe.expectNext() + out4.message.isEvent shouldBe true + out4.getEvent.seqNr shouldBe env4.sequenceNr + + val out5 = probe.expectNext() + out5.message.isEvent shouldBe true + out5.getEvent.seqNr shouldBe env5.sequenceNr + + } + } } diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala index 33c76d08d..2d01bbbc6 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/replication/ReplicationSettingsSpec.scala @@ -32,24 +32,30 @@ class ReplicationSettingsSpec extends AnyWordSpec with Matchers { # which of the replicas this node belongs to, should be the same # across the nodes of each replica Akka cluster. self-replica-id = dca + # Pick it up from an environment variable to re-use the same config # without changes across replicas self-replica-id = ${?SELF_REPLICA} + # max number of parallel in-flight (sent over sharding) entity updates # per consumer/projection parallel-updates = 8 + # Fail the replication stream (and restart with backoff) if completing # the write of a replicated event reaching the cluster takes more time # than this. entity-event-replication-timeout = 10s + replicas: [ { # Unique identifier of the replica/datacenter, is stored in the events # and cannot be changed after events have been persisted. replica-id = "dca" + # Number of replication streams/projections to start to consume events # from this replica number-of-consumers = 4 + # Akka gRPC client config block for how to reach this replica # from the other replicas, note that binding the server/publishing # endpoint of each replica is done separately, in code. diff --git a/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/ReplicationSettings.excludes b/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/ReplicationSettings.excludes new file mode 100644 index 000000000..65f9246cd --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/ReplicationSettings.excludes @@ -0,0 +1,2 @@ +# added indirectReplication to private constructor +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.replication.scaladsl.ReplicationSettings.this") diff --git a/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/indirect.excludes b/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/indirect.excludes new file mode 100644 index 000000000..397f452e3 --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/indirect.excludes @@ -0,0 +1,5 @@ +# indirect replication, internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.scaladsl.EventProducer#EventProducerSource.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.producer.scaladsl.EventProducer#EventProducerSource.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.replication.javadsl.ReplicationSettings.this") diff --git a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_consumer.proto b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_consumer.proto index 27f2d564a..1dc72687e 100644 --- a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_consumer.proto +++ b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_consumer.proto @@ -59,6 +59,7 @@ message ConsumerEventInit { message ConsumerEventStart { repeated FilterCriteria filter = 1; + ReplicaInfo replica_info = 6; } message KeepAlive { } diff --git a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto index 677c81817..b2e2a7322 100644 --- a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto +++ b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto @@ -65,6 +65,7 @@ message InitReq { Offset offset = 4; // consumer defined event filters repeated FilterCriteria filter = 5; + ReplicaInfo replica_info = 6; } // Add filter criteria to exclude and include events for matching entities. @@ -203,6 +204,23 @@ message PersistenceIdSeqNr { int64 seq_nr = 2; } +// Used for Replicated Event Sourcing to filter events based on origin. +// For edge topologies, like star topologies, an edge replica is not connected +// to all other replicas, but should be able to receive events indirectly via +// the replica that it is consuming from. +// +// Events originating from other replicas that the consumer is connected to are excluded +// and emitted as FilteredEvent from the producer side, because the consumer will receive +// them directly from the other replica. +// Events originating from the consumer replica itself are excluded (break the cycle). +// Events originating from the producer replica are always included. +message ReplicaInfo { + // The replica id of the consumer + string replica_id = 6; + // Other replicas that the consumer is connected to. + repeated string other_replica_ids = 7; +} + message StreamOut { oneof message { @@ -261,6 +279,7 @@ message LoadEventRequest { string stream_id = 1; string persistence_id = 2; int64 seq_nr = 3; + ReplicaInfo replica_info = 4; } // Response to `LoadEventRequest`. diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala index 806328f3b..9bfceb838 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala @@ -71,9 +71,14 @@ object GrpcReadJournal { clientSettings: GrpcClientSettings, protobufDescriptors: java.util.List[Descriptors.FileDescriptor]): GrpcReadJournal = { import akka.util.ccompat.JavaConverters._ - new GrpcReadJournal(scaladsl - .GrpcReadJournal(settings, clientSettings, protobufDescriptors.asScala.toList, ProtoAnySerialization.Prefer.Java)( - system)) + new GrpcReadJournal( + scaladsl + .GrpcReadJournal( + settings, + clientSettings, + protobufDescriptors.asScala.toList, + ProtoAnySerialization.Prefer.Java, + replicationSettings = None)(system)) } } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 504fd74f0..03a131633 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -57,12 +57,15 @@ import io.grpc.Status import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder import org.slf4j.Logger import org.slf4j.LoggerFactory - import java.time.Instant import java.util.concurrent.TimeUnit + import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future + +import akka.projection.grpc.internal.proto.ReplicaInfo +import akka.projection.grpc.replication.scaladsl.ReplicationSettings @ApiMayChange object GrpcReadJournal { val Identifier = "akka.projection.grpc.consumer" @@ -110,7 +113,7 @@ object GrpcReadJournal { clientSettings: GrpcClientSettings, protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor])( implicit system: ClassicActorSystemProvider): GrpcReadJournal = - apply(settings, clientSettings, protobufDescriptors, ProtoAnySerialization.Prefer.Scala) + apply(settings, clientSettings, protobufDescriptors, ProtoAnySerialization.Prefer.Scala, replicationSettings = None) /** * INTERNAL API @@ -119,7 +122,9 @@ object GrpcReadJournal { settings: GrpcQuerySettings, clientSettings: GrpcClientSettings, protobufDescriptors: immutable.Seq[Descriptors.FileDescriptor], - protobufPrefer: ProtoAnySerialization.Prefer)(implicit system: ClassicActorSystemProvider): GrpcReadJournal = { + protobufPrefer: ProtoAnySerialization.Prefer, + replicationSettings: Option[ReplicationSettings[_]])( + implicit system: ClassicActorSystemProvider): GrpcReadJournal = { // FIXME issue #702 This probably means that one GrpcReadJournal instance is created for each Projection instance, // and therefore one grpc client for each. Is that fine or should the client be shared for same clientSettings? @@ -137,7 +142,8 @@ object GrpcReadJournal { system.classicSystem.asInstanceOf[ExtendedActorSystem], settings, withChannelBuilderOverrides(clientSettings), - protoAnySerialization) + protoAnySerialization, + replicationSettings) } private def withChannelBuilderOverrides(clientSettings: GrpcClientSettings): GrpcClientSettings = { @@ -157,7 +163,8 @@ final class GrpcReadJournal private ( system: ExtendedActorSystem, settings: GrpcQuerySettings, clientSettings: GrpcClientSettings, - protoAnySerialization: ProtoAnySerialization) + protoAnySerialization: ProtoAnySerialization, + replicationSettings: Option[ReplicationSettings[_]]) extends ReadJournal with EventsBySliceQuery with EventTimestampQuery @@ -176,7 +183,8 @@ final class GrpcReadJournal private ( system, GrpcQuerySettings(config), withChannelBuilderOverrides(GrpcClientSettings.fromConfig(config.getConfig("client"))(system)), - new ProtoAnySerialization(system.toTyped, descriptors = Nil, protoAnyPrefer)) + new ProtoAnySerialization(system.toTyped, descriptors = Nil, protoAnyPrefer), + replicationSettings = None) // When created from `GrpcReadJournalProvider`. def this(system: ExtendedActorSystem, config: Config, cfgPath: String) = @@ -193,6 +201,9 @@ final class GrpcReadJournal private ( case None => Seq.empty } + private val replicaInfo = + replicationSettings.map(s => ReplicaInfo(s.selfReplicaId.id, s.otherReplicas.toSeq.map(_.replicaId.id))) + @InternalApi private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit = { consumerFilter.ref ! ConsumerFilter.Replay( @@ -340,7 +351,7 @@ final class GrpcReadJournal private ( .futureSource { initFilter.map { filter => val protoCriteria = toProtoFilterCriteria(filter.criteria) - val initReq = InitReq(streamId, minSlice, maxSlice, protoOffset, protoCriteria) + val initReq = InitReq(streamId, minSlice, maxSlice, protoOffset, protoCriteria, replicaInfo) Source .single(StreamIn(StreamIn.Message.Init(initReq))) @@ -445,7 +456,7 @@ final class GrpcReadJournal private ( sequenceNr) import system.dispatcher addRequestHeaders(client.loadEvent()) - .invoke(LoadEventRequest(settings.streamId, persistenceId, sequenceNr)) + .invoke(LoadEventRequest(settings.streamId, persistenceId, sequenceNr, replicaInfo)) .map { case LoadEventResponse(LoadEventResponse.Message.Event(event), _) => eventToEnvelope(event, settings.streamId) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala index 4ad535ae5..9323d8ab7 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala @@ -199,34 +199,56 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation replayParallelism = producerSource.settings.replayParallelism)) .join(Flow.fromSinkAndSource(Sink.ignore, events)) + val eventOriginFilter: EventEnvelope[_] => Boolean = + producerSource.replicatedEventOriginFilter + .flatMap(f => init.replicaInfo.map(f.createFilter)) + .getOrElse((_: EventEnvelope[_]) => true) + val eventsStreamOut: Flow[StreamIn, StreamOut, NotUsed] = eventsFlow.mapAsync(producerSource.settings.transformationParallelism) { env => - import system.executionContext - transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization) - .map { - case Some(event) => - log.traceN( - "Emitting event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", - env.persistenceId, - env.sequenceNr, - env.offset, - event.source) - StreamOut(StreamOut.Message.Event(event)) - case None => - log.traceN( - "Filtered event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", - env.persistenceId, - env.sequenceNr, - env.offset, - env.source) - StreamOut( - StreamOut.Message.FilteredEvent( - FilteredEvent( - env.persistenceId, - env.sequenceNr, - env.slice, - ProtobufProtocolConversions.offsetToProtoOffset(env.offset)))) - } + if (eventOriginFilter(env)) { + import system.executionContext + transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization) + .map { + case Some(event) => + log.traceN( + "Emitting event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", + env.persistenceId, + env.sequenceNr, + env.offset, + event.source) + StreamOut(StreamOut.Message.Event(event)) + case None => + log.traceN( + "Filtered event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", + env.persistenceId, + env.sequenceNr, + env.offset, + env.source) + StreamOut( + StreamOut.Message.FilteredEvent( + FilteredEvent( + env.persistenceId, + env.sequenceNr, + env.slice, + ProtobufProtocolConversions.offsetToProtoOffset(env.offset)))) + } + } else { + log.traceN( + "Filtered event, due to origin, from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", + env.persistenceId, + env.sequenceNr, + env.offset, + env.source) + Future.successful( + StreamOut( + StreamOut.Message.FilteredEvent( + FilteredEvent( + env.persistenceId, + env.sequenceNr, + env.slice, + ProtobufProtocolConversions.offsetToProtoOffset(env.offset))))) + } } eventsStreamOut @@ -270,28 +292,50 @@ import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation import system.executionContext q.loadEnvelope[Any](req.persistenceId, req.seqNr) .flatMap { env => - transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization) - .map { - case Some(event) => - log.traceN( - "Loaded event from persistenceId [{}] with seqNr [{}], offset [{}]", - env.persistenceId, - env.sequenceNr, - env.offset) - LoadEventResponse(LoadEventResponse.Message.Event(event)) - case None => - log.traceN( - "Filtered loaded event from persistenceId [{}] with seqNr [{}], offset [{}]", - env.persistenceId, - env.sequenceNr, - env.offset) - LoadEventResponse(LoadEventResponse.Message.FilteredEvent(FilteredEvent( - persistenceId = env.persistenceId, - seqNr = env.sequenceNr, - slice = env.slice, - offset = ProtobufProtocolConversions.offsetToProtoOffset(env.offset), - source = env.source))) - } + val eventOriginFilter: EventEnvelope[_] => Boolean = + producerSource.replicatedEventOriginFilter + .flatMap(f => req.replicaInfo.map(f.createFilter)) + .getOrElse((_: EventEnvelope[_]) => true) + if (eventOriginFilter(env)) { + transformAndEncodeEvent(producerSource.transformation, env, protoAnySerialization) + .map { + case Some(event) => + log.traceN( + "Loaded event from persistenceId [{}] with seqNr [{}], offset [{}]", + env.persistenceId, + env.sequenceNr, + env.offset) + LoadEventResponse(LoadEventResponse.Message.Event(event)) + case None => + log.traceN( + "Filtered loaded event from persistenceId [{}] with seqNr [{}], offset [{}]", + env.persistenceId, + env.sequenceNr, + env.offset) + LoadEventResponse(LoadEventResponse.Message.FilteredEvent(FilteredEvent( + persistenceId = env.persistenceId, + seqNr = env.sequenceNr, + slice = env.slice, + offset = ProtobufProtocolConversions.offsetToProtoOffset(env.offset), + source = env.source))) + } + } else { + log.traceN( + "Filtered loaded event, due to origin, from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", + env.persistenceId, + env.sequenceNr, + env.offset, + env.source) + Future.successful( + LoadEventResponse( + LoadEventResponse.Message.FilteredEvent( + FilteredEvent( + env.persistenceId, + env.sequenceNr, + env.slice, + ProtobufProtocolConversions.offsetToProtoOffset(env.offset), + source = env.source)))) + } } .recoverWith { case e: NoSuchElementException => diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala index d6d29342e..099c76731 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala @@ -6,6 +6,7 @@ package akka.projection.grpc.producer.scaladsl import scala.concurrent.Future import scala.reflect.ClassTag + import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange @@ -25,6 +26,7 @@ import akka.projection.grpc.internal.TopicMatcher import akka.projection.grpc.internal.proto.EventProducerServicePowerApiHandler import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.producer.javadsl.{ Transformation => JTransformation } +import akka.projection.grpc.replication.internal.EventOriginFilter /** * The event producer implementation that can be included a gRPC route in an Akka HTTP server. @@ -38,7 +40,14 @@ object EventProducer { streamId: String, transformation: Transformation, settings: EventProducerSettings): EventProducerSource = - new EventProducerSource(entityType, streamId, transformation, settings, _ => true, transformSnapshot = None) + new EventProducerSource( + entityType, + streamId, + transformation, + settings, + _ => true, + transformSnapshot = None, + replicatedEventOriginFilter = None) def apply[Event]( entityType: String, @@ -52,7 +61,8 @@ object EventProducer { transformation, settings, producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean], - transformSnapshot = None) + transformSnapshot = None, + replicatedEventOriginFilter = None) } @@ -69,7 +79,9 @@ object EventProducer { val transformation: Transformation, val settings: EventProducerSettings, val producerFilter: EventEnvelope[Any] => Boolean, - val transformSnapshot: Option[Any => Any]) { + val transformSnapshot: Option[Any => Any], + /** INTERNAL API */ + @InternalApi private[akka] val replicatedEventOriginFilter: Option[EventOriginFilter]) { require(entityType.nonEmpty, "Entity type must not be empty") require(streamId.nonEmpty, "Stream id must not be empty") @@ -104,14 +116,25 @@ object EventProducer { def withStartingFromSnapshots[Snapshot, Event](transformSnapshot: Snapshot => Event): EventProducerSource = copy(transformSnapshot = Some(transformSnapshot.asInstanceOf[Any => Any])) + def withReplicatedEventOriginFilter(filter: EventOriginFilter): EventProducerSource = + copy(replicatedEventOriginFilter = Some(filter)) + def copy( entityType: String = entityType, streamId: String = streamId, transformation: Transformation = transformation, settings: EventProducerSettings = settings, producerFilter: EventEnvelope[Any] => Boolean = producerFilter, - transformSnapshot: Option[Any => Any] = transformSnapshot): EventProducerSource = - new EventProducerSource(entityType, streamId, transformation, settings, producerFilter, transformSnapshot) + transformSnapshot: Option[Any => Any] = transformSnapshot, + replicatedEventOriginFilter: Option[EventOriginFilter] = replicatedEventOriginFilter): EventProducerSource = + new EventProducerSource( + entityType, + streamId, + transformation, + settings, + producerFilter, + transformSnapshot, + replicatedEventOriginFilter) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala new file mode 100644 index 000000000..902fd4a63 --- /dev/null +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2023 Lightbend Inc. + */ + +package akka.projection.grpc.replication.internal + +import akka.annotation.InternalApi +import akka.persistence.query.typed.EventEnvelope +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.internal.ReplicatedEventMetadata +import akka.projection.grpc.internal.proto.ReplicaInfo + +/** + * INTERNAL API + */ +@InternalApi private[akka] class EventOriginFilter(selfReplicaId: ReplicaId) { + + def createFilter(consumerReplicaInfo: ReplicaInfo): EventEnvelope[_] => Boolean = { + // For edge topologies, like star topologies, an edge replica is not connected + // to all other replicas, but should be able to receive events indirectly via + // the replica that it is consuming from. + // + // Events originating from other replicas that the consumer is connected to are excluded + // because the consumer will receive them directly from the other replica. + // Events originating from the consumer replica itself are excluded (break the cycle). + // Events originating from the producer replica are always included. + val exclude: Set[ReplicaId] = + consumerReplicaInfo.otherReplicaIds.map(ReplicaId.apply).toSet ++ + (if (consumerReplicaInfo.replicaId == "") Nil else List(ReplicaId(consumerReplicaInfo.replicaId))) - + selfReplicaId + + { envelope => + // eventMetadata is not included in backtracking envelopes. + // Events from backtracking are lazily loaded via `loadEvent` if needed. + // Filter is done via `loadEvent` in that case. + if (envelope.eventOption.isEmpty) + true + else + envelope.eventMetadata match { + case Some(meta: ReplicatedEventMetadata) => + !exclude(meta.originReplica) + case _ => + throw new IllegalArgumentException( + s"Got an event without replication metadata, not supported (pid: ${envelope.persistenceId}, seq_nr: ${envelope.sequenceNr})") + } + } + } + +} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index d6f855ec5..6c1bf3bc3 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -34,8 +34,10 @@ import akka.projection.ProjectionBehavior import akka.projection.ProjectionContext import akka.projection.ProjectionId import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.grpc.consumer.ConsumerFilter import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal +import akka.projection.grpc.internal.ProtoAnySerialization import akka.projection.grpc.producer.scaladsl.EventProducer import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation @@ -67,8 +69,6 @@ private[akka] object ReplicationImpl { private val log = LoggerFactory.getLogger(classOf[ReplicationImpl[_]]) - private val filteredEvent = Future.successful(None) - /** * Called to bootstrap the entity on each cluster node in each of the replicas. * @@ -76,28 +76,23 @@ private[akka] object ReplicationImpl { */ def grpcReplication[Command, Event, State]( settings: ReplicationSettings[Command], - producerFilter: EventEnvelope[Event] => Boolean, replicatedEntity: ReplicatedEntity[Command])(implicit system: ActorSystem[_]): ReplicationImpl[Command] = { require( system.classicSystem.asInstanceOf[ExtendedActorSystem].provider.isInstanceOf[ClusterActorRefProvider], "Replicated Event Sourcing over gRPC only possible together with Akka cluster (akka.actor.provider = cluster)") + if (settings.initialConsumerFilter.nonEmpty) { + ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(settings.streamId, settings.initialConsumerFilter) + } + // set up a publisher - val onlyLocalOriginTransformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper(envelope => - envelope.eventMetadata match { - case Some(meta: ReplicatedEventMetadata) => - if (meta.originReplica == settings.selfReplicaId) Future.successful(envelope.eventOption) - else filteredEvent // Optimization: was replicated to this DC, don't pass the payload across the wire - case _ => - throw new IllegalArgumentException( - s"Got an event without replication metadata, not supported (pid: ${envelope.persistenceId}, seq_nr: ${envelope.sequenceNr})") - }) val eps = EventProducerSource( settings.entityTypeKey.name, settings.streamId, - onlyLocalOriginTransformer, + Transformation.identity, settings.eventProducerSettings, - producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean]) + settings.producerFilter) + .withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId)) val sharding = ClusterSharding(system) sharding.init(replicatedEntity.entity) @@ -132,7 +127,12 @@ private[akka] object ReplicationImpl { val s = GrpcQuerySettings(settings.streamId) remoteReplica.additionalQueryRequestMetadata.fold(s)(s.withAdditionalRequestMetadata) } - val eventsBySlicesQuery = GrpcReadJournal(grpcQuerySettings, remoteReplica.grpcClientSettings, Nil) + val eventsBySlicesQuery = GrpcReadJournal( + grpcQuerySettings, + remoteReplica.grpcClientSettings, + Nil, + ProtoAnySerialization.Prefer.Scala, + Some(settings)) log.infoN( "Starting {} projection streams{} consuming events for Replicated Entity [{}] from [{}] (at {}:{})", remoteReplica.numberOfConsumers, @@ -171,14 +171,19 @@ private[akka] object ReplicationImpl { case (envelope, _) => if (!envelope.filtered) { envelope.eventMetadata match { - case Some(replicatedEventMetadata: ReplicatedEventMetadata) => - // skipping events originating from other replicas is handled by filtering but for good measure - if (replicatedEventMetadata.originReplica != remoteReplica.replicaId) - throw new IllegalArgumentException( - "Expected replicated event from replica " + - s"[${remoteReplica.replicaId}] but was [${replicatedEventMetadata.originReplica}]. " + - "Verify your replication configuration, such as self-replica-id.") + case Some(replicatedEventMetadata: ReplicatedEventMetadata) + if replicatedEventMetadata.originReplica == settings.selfReplicaId => + // skipping events originating from self replica (break cycle) + if (log.isTraceEnabled) + log.traceN( + "[{}] ignoring event from replica [{}] with self origin (pid [{}], seq_nr [{}])", + projectionKey, + remoteReplica.replicaId, + envelope.persistenceId, + envelope.sequenceNr) + Future.successful(Done) + case Some(replicatedEventMetadata: ReplicatedEventMetadata) => val replicationId = ReplicationId.fromString(envelope.persistenceId) val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId) val entityRef = diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala index 6e10463d8..c64da74c3 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala @@ -4,6 +4,9 @@ package akka.projection.grpc.replication.javadsl +import java.util.concurrent.CompletionStage +import java.util.function.Predicate + import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.annotation.ApiMayChange @@ -16,6 +19,7 @@ import akka.cluster.sharding.typed.javadsl.EntityTypeKey import akka.http.javadsl.model.HttpRequest import akka.http.javadsl.model.HttpResponse import akka.japi.function.{ Function => JFunction } +import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.ReplicationId import akka.persistence.typed.internal.ReplicationContextImpl import akka.persistence.typed.javadsl.ReplicationContext @@ -23,11 +27,6 @@ import akka.persistence.typed.scaladsl.ReplicatedEventSourcing import akka.projection.grpc.producer.javadsl.EventProducer import akka.projection.grpc.producer.javadsl.EventProducerSource import akka.projection.grpc.replication.internal.ReplicationImpl -import java.util.concurrent.CompletionStage -import java.util.function.Predicate - -import akka.persistence.query.typed.EventEnvelope -import akka.projection.grpc.internal.TopicMatcher /** * Created using [[Replication.grpcReplication]], which starts sharding with the entity and @@ -79,25 +78,6 @@ object Replication { settings: ReplicationSettings[Command], replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], system: ActorSystem[_]): Replication[Command] = { - val trueProducerFilter = new Predicate[EventEnvelope[Event]] { - override def test(env: EventEnvelope[Event]): Boolean = true - } - grpcReplication[Command, Event, State](settings, trueProducerFilter, replicatedBehaviorFactory, system) - } - - /** - * Called to bootstrap the entity on each cluster node in each of the replicas. - * - * Filter events matching the `producerFilter` predicate, for example based on tags. - * - * Important: Note that this does not publish the endpoint, additional steps are needed! - */ - def grpcReplication[Command, Event, State]( - settings: ReplicationSettings[Command], - producerFilter: Predicate[EventEnvelope[Event]], - replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], - system: ActorSystem[_]): Replication[Command] = { - val scalaReplicationSettings = settings.toScala val replicatedEntity = @@ -123,13 +103,8 @@ object Replication { })) .toScala) - val scalaProducerFilter: EventEnvelope[Event] => Boolean = producerFilter.test - val scalaRESOG = - ReplicationImpl.grpcReplication[Command, Event, State]( - scalaReplicationSettings, - scalaProducerFilter, - replicatedEntity)(system) + ReplicationImpl.grpcReplication[Command, Event, State](scalaReplicationSettings, replicatedEntity)(system) val jEventProducerSource = new EventProducerSource( scalaRESOG.eventProducerService.entityType, scalaRESOG.eventProducerService.streamId, @@ -152,6 +127,24 @@ object Replication { } } + /** + * Called to bootstrap the entity on each cluster node in each of the replicas. + * + * Filter events matching the `producerFilter` predicate, for example based on tags. + * + * Important: Note that this does not publish the endpoint, additional steps are needed! + */ + @Deprecated + @deprecated("Define producerFilter via settings.withProducerFilter", "1.5.1") + def grpcReplication[Command, Event, State]( + settings: ReplicationSettings[Command], + producerFilter: Predicate[EventEnvelope[Event]], + replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], + system: ActorSystem[_]): Replication[Command] = { + grpcReplication(settings.withProducerFilter(producerFilter), replicatedBehaviorFactory, system) + + } + /** * Called to bootstrap the entity on each cluster node in each of the replicas. * @@ -160,18 +153,14 @@ object Replication { * * Important: Note that this does not publish the endpoint, additional steps are needed! */ + @Deprecated + @deprecated("Define topicExpression via settings.withProducerFilterTopicExpression", "1.5.1") def grpcReplication[Command, Event, State]( settings: ReplicationSettings[Command], topicExpression: String, replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], system: ActorSystem[_]): Replication[Command] = { - val topicMatcher = TopicMatcher(topicExpression) - grpcReplication( - settings, - (env: EventEnvelope[Event]) => topicMatcher.matches(env, settings.eventProducerSettings.topicTagPrefix), - replicatedBehaviorFactory, - system) - + grpcReplication(settings.withProducerFilterTopicExpression(topicExpression), replicatedBehaviorFactory, system) } } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala index d5e1fb6e2..2ae9484cf 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/ReplicationSettings.scala @@ -4,6 +4,9 @@ package akka.projection.grpc.replication.javadsl +import java.util.function.{ Function => JFunction } +import java.util.{ List => JList } + import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit @@ -19,10 +22,15 @@ import akka.projection.grpc.replication.internal.ReplicaImpl import akka.projection.grpc.replication.scaladsl.{ ReplicationSettings => SReplicationSettings } import akka.util.JavaDurationConverters.JavaDurationOps import com.typesafe.config.Config - import java.time.Duration +import java.util.Collections import java.util.Optional +import java.util.function.Predicate import java.util.{ Set => JSet } + +import akka.persistence.query.typed.EventEnvelope +import akka.projection.grpc.consumer.ConsumerFilter +import akka.projection.grpc.internal.TopicMatcher import akka.util.ccompat.JavaConverters._ import akka.projection.grpc.replication.internal.ReplicationProjectionProviderAdapter @@ -67,7 +75,9 @@ object ReplicationSettings { parallelUpdates, replicationProjectionProvider, Optional.empty(), - identity) + identity, + _ => true, + Collections.emptyList) } /** @@ -117,7 +127,9 @@ object ReplicationSettings { parallelUpdates = config.getInt("parallel-updates"), replicationProjectionProvider = replicationProjectionProvider, Optional.empty(), - identity) + identity, + _ => true, + Collections.emptyList) } } @@ -136,9 +148,11 @@ final class ReplicationSettings[Command] private ( val parallelUpdates: Int, val replicationProjectionProvider: ReplicationProjectionProvider, val eventProducerInterceptor: Optional[EventProducerInterceptor], - val configureEntity: java.util.function.Function[ + val configureEntity: JFunction[ Entity[Command, ShardingEnvelope[Command]], - Entity[Command, ShardingEnvelope[Command]]]) { + Entity[Command, ShardingEnvelope[Command]]], + val producerFilter: Predicate[EventEnvelope[Any]], + val initialConsumerFilter: JList[ConsumerFilter.FilterCriteria]) { def withSelfReplicaId(selfReplicaId: ReplicaId): ReplicationSettings[Command] = copy(selfReplicaId = selfReplicaId) @@ -181,11 +195,34 @@ final class ReplicationSettings[Command] private ( * Allows for changing the settings of the replicated entity, such as stop message, passivation strategy etc. */ def configureEntity( - configure: java.util.function.Function[ - Entity[Command, ShardingEnvelope[Command]], - Entity[Command, ShardingEnvelope[Command]]]): ReplicationSettings[Command] = + configure: JFunction[Entity[Command, ShardingEnvelope[Command]], Entity[Command, ShardingEnvelope[Command]]]) + : ReplicationSettings[Command] = copy(configureEntity = configure) + /** + * Filter events matching the `producerFilter` predicate, for example based on tags. + */ + def withProducerFilter[Event](producerFilter: Predicate[EventEnvelope[Event]]): ReplicationSettings[Command] = + copy(producerFilter = producerFilter.asInstanceOf[Predicate[EventEnvelope[Any]]]) + + /** + * Filter events matching the topic expression according to MQTT specification, including wildcards. + * The topic of an event is defined by a tag with certain prefix, see `topic-tag-prefix` configuration. + */ + def withProducerFilterTopicExpression(topicExpression: String): ReplicationSettings[Command] = { + val topicMatcher = TopicMatcher(topicExpression) + withProducerFilter[Any](env => topicMatcher.matches(env, eventProducerSettings.topicTagPrefix)) + } + + /** + * Set the initial consumer filter to use for events. Should only be used for static, up front consumer filters. + * Combining this with updating consumer filters directly means that the filters may be reset to these + * filters. + */ + def withInitialConsumerFilter( + initialConsumerFilter: JList[ConsumerFilter.FilterCriteria]): ReplicationSettings[Command] = + copy(initialConsumerFilter = initialConsumerFilter) + private def copy( selfReplicaId: ReplicaId = selfReplicaId, entityTypeKey: EntityTypeKey[Command] = entityTypeKey, @@ -196,9 +233,12 @@ final class ReplicationSettings[Command] private ( parallelUpdates: Int = parallelUpdates, projectionProvider: ReplicationProjectionProvider = replicationProjectionProvider, eventProducerInterceptor: Optional[EventProducerInterceptor] = eventProducerInterceptor, - configureEntity: java.util.function.Function[ + configureEntity: JFunction[ Entity[Command, ShardingEnvelope[Command]], - Entity[Command, ShardingEnvelope[Command]]] = configureEntity): ReplicationSettings[Command] = + Entity[Command, ShardingEnvelope[Command]]] = configureEntity, + producerFilter: Predicate[EventEnvelope[Any]] = producerFilter, + initialConsumerFilter: JList[ConsumerFilter.FilterCriteria] = initialConsumerFilter) + : ReplicationSettings[Command] = new ReplicationSettings[Command]( selfReplicaId, entityTypeKey, @@ -209,7 +249,9 @@ final class ReplicationSettings[Command] private ( parallelUpdates, projectionProvider, eventProducerInterceptor, - configureEntity) + configureEntity, + producerFilter, + initialConsumerFilter) override def toString = s"ReplicationSettings($selfReplicaId, $entityTypeKey, $streamId, ${otherReplicas.asScala.mkString(", ")})" @@ -227,5 +269,7 @@ final class ReplicationSettings[Command] private ( entityEventReplicationTimeout = entityEventReplicationTimeout.asScala, parallelUpdates = parallelUpdates, replicationProjectionProvider = ReplicationProjectionProviderAdapter.toScala(replicationProjectionProvider)) + .withProducerFilter(producerFilter.test) + .withInitialConsumerFilter(initialConsumerFilter.asScala.toVector) } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala index eef641984..b14880e7f 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala @@ -4,6 +4,8 @@ package akka.projection.grpc.replication.scaladsl +import scala.concurrent.Future + import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.annotation.ApiMayChange @@ -14,14 +16,11 @@ import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse +import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.ReplicationId import akka.persistence.typed.scaladsl.ReplicatedEventSourcing import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource import akka.projection.grpc.replication.internal.ReplicationImpl -import scala.concurrent.Future - -import akka.persistence.query.typed.EventEnvelope -import akka.projection.grpc.internal.TopicMatcher /** * Created using [[Replication.grpcReplication]], which starts sharding with the entity and @@ -70,23 +69,8 @@ object Replication { * Important: Note that this does not publish the endpoint, additional steps are needed! */ def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command])( - replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( - implicit system: ActorSystem[_]): Replication[Command] = - grpcReplication[Command, Event, State](settings, (_: EventEnvelope[Event]) => true)(replicatedBehaviorFactory) - - /** - * Called to bootstrap the entity on each cluster node in each of the replicas. - * - * Filter events matching the `producerFilter` predicate, for example based on tags. - * - * Important: Note that this does not publish the endpoint, additional steps are needed! - */ - def grpcReplication[Command, Event, State]( - settings: ReplicationSettings[Command], - producerFilter: EventEnvelope[Event] => Boolean)( replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( implicit system: ActorSystem[_]): Replication[Command] = { - val replicatedEntity = ReplicatedEntity( settings.selfReplicaId, @@ -100,7 +84,23 @@ object Replication { } })) - ReplicationImpl.grpcReplication[Command, Event, State](settings, producerFilter, replicatedEntity) + ReplicationImpl.grpcReplication[Command, Event, State](settings, replicatedEntity) + } + + /** + * Called to bootstrap the entity on each cluster node in each of the replicas. + * + * Filter events matching the `producerFilter` predicate, for example based on tags. + * + * Important: Note that this does not publish the endpoint, additional steps are needed! + */ + @deprecated("Define producerFilter via settings.withProducerFilter", "1.5.1") + def grpcReplication[Command, Event, State]( + settings: ReplicationSettings[Command], + producerFilter: EventEnvelope[Event] => Boolean)( + replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( + implicit system: ActorSystem[_]): Replication[Command] = { + grpcReplication(settings.withProducerFilter(producerFilter))(replicatedBehaviorFactory) } /** @@ -111,14 +111,11 @@ object Replication { * * Important: Note that this does not publish the endpoint, additional steps are needed! */ + @deprecated("Define topicExpression via settings.withProducerFilterTopicExpression", "1.5.1") def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command], topicExpression: String)( replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( implicit system: ActorSystem[_]): Replication[Command] = { - val topicMatcher = TopicMatcher(topicExpression) - grpcReplication( - settings, - (env: EventEnvelope[Event]) => - topicMatcher.matches(env, settings.eventProducerSettings.topicTagPrefix))(replicatedBehaviorFactory) + grpcReplication(settings.withProducerFilterTopicExpression(topicExpression))(replicatedBehaviorFactory) } } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala index e1af7eade..11aac9f71 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/ReplicationSettings.scala @@ -4,6 +4,10 @@ package akka.projection.grpc.replication.scaladsl +import scala.collection.immutable +import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag + import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange import akka.annotation.InternalApi @@ -11,7 +15,10 @@ import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.grpc.GrpcClientSettings +import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.ReplicaId +import akka.projection.grpc.consumer.ConsumerFilter +import akka.projection.grpc.internal.TopicMatcher import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.producer.scaladsl.EventProducerInterceptor import akka.projection.grpc.replication.internal.ReplicaImpl @@ -20,9 +27,6 @@ import akka.util.JavaDurationConverters._ import akka.util.ccompat.JavaConverters._ import com.typesafe.config.Config -import scala.concurrent.duration.FiniteDuration -import scala.reflect.ClassTag - @ApiMayChange object ReplicationSettings { @@ -81,7 +85,9 @@ object ReplicationSettings { parallelUpdates = parallelUpdates, projectionProvider = replicationProjectionProvider, None, - identity) + identity, + producerFilter = _ => true, + initialConsumerFilter = Vector.empty) } /** @@ -131,7 +137,9 @@ object ReplicationSettings { parallelUpdates = config.getInt("parallel-updates"), projectionProvider = replicationProjectionProvider, None, - identity) + identity, + producerFilter = _ => true, + initialConsumerFilter = Vector.empty) } } @@ -150,7 +158,9 @@ final class ReplicationSettings[Command] private ( val parallelUpdates: Int, val projectionProvider: ReplicationProjectionProvider, val eventProducerInterceptor: Option[EventProducerInterceptor], - val configureEntity: Entity[Command, ShardingEnvelope[Command]] => Entity[Command, ShardingEnvelope[Command]]) { + val configureEntity: Entity[Command, ShardingEnvelope[Command]] => Entity[Command, ShardingEnvelope[Command]], + val producerFilter: EventEnvelope[Any] => Boolean, + val initialConsumerFilter: immutable.Seq[ConsumerFilter.FilterCriteria]) { require( !otherReplicas.exists(_.replicaId == selfReplicaId), @@ -205,6 +215,30 @@ final class ReplicationSettings[Command] private ( : ReplicationSettings[Command] = copy(configureEntity = configure) + /** + * Filter events matching the `producerFilter` predicate, for example based on tags. + */ + def withProducerFilter[Event](producerFilter: EventEnvelope[Event] => Boolean): ReplicationSettings[Command] = + copy(producerFilter = producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean]) + + /** + * Filter events matching the topic expression according to MQTT specification, including wildcards. + * The topic of an event is defined by a tag with certain prefix, see `topic-tag-prefix` configuration. + */ + def withProducerFilterTopicExpression(topicExpression: String): ReplicationSettings[Command] = { + val topicMatcher = TopicMatcher(topicExpression) + withProducerFilter[Any](env => topicMatcher.matches(env, eventProducerSettings.topicTagPrefix)) + } + + /** + * Set the initial consumer filter to use for events. Should only be used for static, up front consumer filters. + * Combining this with updating consumer filters directly means that the filters may be reset to these + * filters. + */ + def withInitialConsumerFilter( + initialConsumerFilter: immutable.Seq[ConsumerFilter.FilterCriteria]): ReplicationSettings[Command] = + copy(initialConsumerFilter = initialConsumerFilter) + private def copy( selfReplicaId: ReplicaId = selfReplicaId, entityTypeKey: EntityTypeKey[Command] = entityTypeKey, @@ -216,7 +250,9 @@ final class ReplicationSettings[Command] private ( projectionProvider: ReplicationProjectionProvider = projectionProvider, producerInterceptor: Option[EventProducerInterceptor] = eventProducerInterceptor, configureEntity: Entity[Command, ShardingEnvelope[Command]] => Entity[Command, ShardingEnvelope[Command]] = - configureEntity) = + configureEntity, + producerFilter: EventEnvelope[Any] => Boolean = producerFilter, + initialConsumerFilter: immutable.Seq[ConsumerFilter.FilterCriteria] = initialConsumerFilter) = new ReplicationSettings[Command]( selfReplicaId, entityTypeKey, @@ -227,7 +263,9 @@ final class ReplicationSettings[Command] private ( parallelUpdates, projectionProvider, producerInterceptor, - configureEntity) + configureEntity, + producerFilter, + initialConsumerFilter) override def toString = s"ReplicationSettings($selfReplicaId, $entityTypeKey, $streamId, $otherReplicas)"