From 630e826f13f8e6bcd87891c79725a943d2330424 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 20 Jan 2023 16:56:05 +0100 Subject: [PATCH] fix: Backtracking and filtering in gRPC Projections (#772) Needed changes along with the new envelope fields in Akka (https://github.com/akka/akka/pull/31817) and in the R2DBC persistence plugin (https://github.com/akka/akka-persistence-r2dbc/pull/348). --- .../projection/grpc/IntegrationSpec.scala | 21 ++++++----- .../scaladsl/LoadEventQuerySpec.scala | 6 +-- .../772.envelope-filter-source.excludes | 23 ++++++++++++ .../akka/projection/grpc/event_producer.proto | 4 ++ .../consumer/scaladsl/GrpcReadJournal.scala | 26 ++++++------- .../internal/EventProducerServiceImpl.scala | 37 +++++++++++++++---- .../scaladsl/TransformationSpec.scala | 4 +- project/Dependencies.scala | 6 ++- 8 files changed, 91 insertions(+), 36 deletions(-) create mode 100644 akka-projection-grpc/src/main/mima-filters/1.3.1.backwards.excludes/772.envelope-filter-source.excludes diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala index a826a9969..636a37877 100644 --- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala +++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/IntegrationSpec.scala @@ -42,6 +42,7 @@ import io.grpc.Status import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory +import org.slf4j.event.Level import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -307,17 +308,19 @@ class IntegrationSpec(testContainerConf: TestContainerConf) entity ! TestEntity.Ping(replyProbe.ref) replyProbe.receiveMessage() - def expectedLogMessage(seqNr: Long): String = - s"Received backtracking event from [127.0.0.1] persistenceId [${pid.id}] with seqNr [$seqNr]" val projection = - LoggingTestKit.trace(expectedLogMessage(1)).expect { - LoggingTestKit.trace(expectedLogMessage(2)).expect { - LoggingTestKit.trace(expectedLogMessage(3)).expect { - // start the projection - spawnExactlyOnceProjection() - } + LoggingTestKit + .custom { event => + event.level == Level.TRACE && event.message.matches( + s"""Received event from \\[127.0.0.1] persistenceId \\[${pid.id + .replace("|", "\\|")}] with seqNr \\[[123]].*""") && event.message + .endsWith("source [BT]") + } + .withOccurrences(3) + .expect { + // start the projection + spawnExactlyOnceProjection() } - } processedProbe.receiveMessage().envelope.event shouldBe "A" processedProbe.receiveMessage().envelope.event shouldBe "B" diff --git a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala index 6e83df94c..b90aa817e 100644 --- a/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala +++ b/akka-projection-grpc/src/it/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala @@ -5,7 +5,6 @@ package akka.projection.grpc.consumer.scaladsl import akka.Done -import akka.NotUsed import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem @@ -15,9 +14,9 @@ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse import akka.projection.grpc.TestContainerConf +import akka.projection.grpc.TestData import akka.projection.grpc.TestDbLifecycle import akka.projection.grpc.TestEntity -import akka.projection.grpc.TestData import akka.projection.grpc.consumer.GrpcQuerySettings import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.producer.scaladsl.EventProducer @@ -122,8 +121,9 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf) val env = grpcReadJournal .loadEnvelope[String](pid.id, sequenceNr = 1L) .futureValue + env.filtered shouldBe true + env.eventMetadata shouldBe None env.eventOption.isEmpty shouldBe true - env.eventMetadata shouldBe Some(NotUsed) } "handle missing event as NOT_FOUND" in new TestFixture { diff --git a/akka-projection-grpc/src/main/mima-filters/1.3.1.backwards.excludes/772.envelope-filter-source.excludes b/akka-projection-grpc/src/main/mima-filters/1.3.1.backwards.excludes/772.envelope-filter-source.excludes new file mode 100644 index 000000000..6ebe708e7 --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.3.1.backwards.excludes/772.envelope-filter-source.excludes @@ -0,0 +1,23 @@ +# wire protocol changes +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.$default$6") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.apply$default$6") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.of") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.copy$default$6") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.$default$6") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.of") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.apply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.apply$default$6") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.$default$5") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.apply$default$5") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.of") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.copy$default$5") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.$default$5") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.of") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.apply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.apply$default$5") \ No newline at end of file diff --git a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto index 7ca55fe96..d0c8e83f6 100644 --- a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto +++ b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto @@ -60,7 +60,10 @@ message Event { int64 seq_nr = 2; int32 slice = 3; Offset offset = 4; + // Actual payload and metadata serialization is deferred to Akka serialization, + // the serializer id and manifest are encoded into a custom type_url schema google.protobuf.Any payload = 5; + string source = 6; } // Events that are filtered out are represented by this @@ -70,6 +73,7 @@ message FilteredEvent { int64 seq_nr = 2; int32 slice = 3; Offset offset = 4; + string source = 5; } message EventTimestampRequest { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index d2de5163e..83e59429c 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -6,10 +6,8 @@ package akka.projection.grpc.consumer.scaladsl import java.time.Instant import java.util.concurrent.TimeUnit - import scala.collection.immutable import scala.concurrent.Future - import akka.Done import akka.NotUsed import akka.actor.ClassicActorSystemProvider @@ -266,23 +264,24 @@ final class GrpcReadJournal private ( case StreamOut(StreamOut.Message.Event(event), _) => if (log.isTraceEnabled) log.traceN( - "Received {}event from [{}] persistenceId [{}] with seqNr [{}], offset [{}]", - if (event.payload.isEmpty) "backtracking " else "", + "Received event from [{}] persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", clientSettings.serviceName, event.persistenceId, event.seqNr, - timestampOffset(event.offset.get).timestamp) + timestampOffset(event.offset.get).timestamp, + event.source) eventToEnvelope(event, streamId) case StreamOut(StreamOut.Message.FilteredEvent(filteredEvent), _) => if (log.isTraceEnabled) log.traceN( - "Received filtered event from [{}] persistenceId [{}] with seqNr [{}], offset [{}]", + "Received filtered event from [{}] persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", clientSettings.serviceName, filteredEvent.persistenceId, filteredEvent.seqNr, - timestampOffset(filteredEvent.offset.get).timestamp) + timestampOffset(filteredEvent.offset.get).timestamp, + filteredEvent.source) filteredEventToEnvelope(filteredEvent, streamId) @@ -309,23 +308,24 @@ final class GrpcReadJournal private ( eventOffset.timestamp.toEpochMilli, eventMetadata = None, PersistenceId.extractEntityType(event.persistenceId), - event.slice) + event.slice, + filtered = false, + source = event.source) } private def filteredEventToEnvelope[Evt](filteredEvent: FilteredEvent, entityType: String): EventEnvelope[Evt] = { val eventOffset = timestampOffset(filteredEvent.offset.get) - - // Note that envelope is marked with NotUsed in the eventMetadata. That is handled by the R2dbcProjection - // implementation to skip the envelope and still store the offset. new EventEnvelope( eventOffset, filteredEvent.persistenceId, filteredEvent.seqNr, None, eventOffset.timestamp.toEpochMilli, - eventMetadata = Some(NotUsed), + eventMetadata = None, entityType, - filteredEvent.slice) + filteredEvent.slice, + filtered = true, + source = filteredEvent.source) } private def timestampOffset(protoOffset: akka.projection.grpc.internal.proto.Offset): TimestampOffset = { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala index 518e00b10..f7f734e91 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventProducerServiceImpl.scala @@ -160,18 +160,19 @@ import scala.util.Success transformAndEncodeEvent(producerSource.transformation, env).map { case Some(event) => log.traceN( - "Emitting {}event from persistenceId [{}] with seqNr [{}], offset [{}]", - if (event.payload.isEmpty) "backtracking " else "", + "Emitting event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", env.persistenceId, env.sequenceNr, - env.offset) + env.offset, + event.source) StreamOut(StreamOut.Message.Event(event)) case None => log.traceN( - "Filtered event from persistenceId [{}] with seqNr [{}], offset [{}]", + "Filtered event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]", env.persistenceId, env.sequenceNr, - env.offset) + env.offset, + env.source) StreamOut( StreamOut.Message.FilteredEvent( FilteredEvent(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env))))) @@ -205,6 +206,13 @@ import scala.util.Success val mappedFuture: Future[Option[Any]] = transformation(env.asInstanceOf[EventEnvelope[Any]]) def toEvent(transformedEvent: Any): Event = { val protoEvent = protoAnySerialization.serialize(transformedEvent) + Event( + persistenceId = env.persistenceId, + seqNr = env.sequenceNr, + slice = env.slice, + offset = Some(protoOffset(env)), + payload = Some(protoEvent), + source = env.source) Event(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)), Some(protoEvent)) } mappedFuture.value match { @@ -217,7 +225,14 @@ import scala.util.Success // Events from backtracking are lazily loaded via `loadEvent` if needed. // Transformation and filter is done via `loadEvent` in that case. Future.successful( - Some(Event(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)), payload = None))) + Some( + Event( + persistenceId = env.persistenceId, + seqNr = env.sequenceNr, + slice = env.slice, + offset = Some(protoOffset(env)), + payload = None, + source = env.source))) } } @@ -271,8 +286,14 @@ import scala.util.Success env.persistenceId, env.sequenceNr, env.offset) - LoadEventResponse(LoadEventResponse.Message.FilteredEvent( - FilteredEvent(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env))))) + LoadEventResponse( + LoadEventResponse.Message.FilteredEvent( + FilteredEvent( + persistenceId = env.persistenceId, + seqNr = env.sequenceNr, + slice = env.slice, + offset = Some(protoOffset(env)), + source = env.source))) } } .recoverWith { diff --git a/akka-projection-grpc/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala b/akka-projection-grpc/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala index 4ae87f872..5bacb7491 100644 --- a/akka-projection-grpc/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala +++ b/akka-projection-grpc/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala @@ -23,7 +23,9 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures { timestamp = System.currentTimeMillis(), entityType = "banana", slice = 5, - eventMetadata = meta).asInstanceOf[EventEnvelope[Any]] + eventMetadata = meta, + filtered = false, + source = "").asInstanceOf[EventEnvelope[Any]] "The gRPC event Transformation" should { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 02a8f35b0..5f734bfeb 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,10 +16,12 @@ object Dependencies { val AkkaPersistenceR2dbcVersionInDocs = Versions.akkaPersistenceR2dbc object Versions { - val akka = sys.props.getOrElse("build.akka.version", "2.7.0") + // FIXME non-milestone + val akka = sys.props.getOrElse("build.akka.version", "2.8.0-M4") val akkaPersistenceCassandra = "1.1.0" val akkaPersistenceJdbc = "5.2.0" - val akkaPersistenceR2dbc = "1.0.1" + // FIXME non-milestone + val akkaPersistenceR2dbc = "1.1.0-M4" val alpakka = "5.0.0" val alpakkaKafka = sys.props.getOrElse("build.alpakka.kafka.version", "4.0.0") val slick = "3.4.1"