Skip to content

Commit

Permalink
fix: Backtracking and filtering in gRPC Projections (#772)
Browse files Browse the repository at this point in the history
Needed changes along with the new envelope fields in Akka (akka/akka#31817) and
in the R2DBC persistence plugin (akka/akka-persistence-r2dbc#348).
  • Loading branch information
johanandren authored Jan 20, 2023
1 parent df84bc3 commit 630e826
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import io.grpc.Status
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory
import org.slf4j.event.Level

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand Down Expand Up @@ -307,17 +308,19 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
entity ! TestEntity.Ping(replyProbe.ref)
replyProbe.receiveMessage()

def expectedLogMessage(seqNr: Long): String =
s"Received backtracking event from [127.0.0.1] persistenceId [${pid.id}] with seqNr [$seqNr]"
val projection =
LoggingTestKit.trace(expectedLogMessage(1)).expect {
LoggingTestKit.trace(expectedLogMessage(2)).expect {
LoggingTestKit.trace(expectedLogMessage(3)).expect {
// start the projection
spawnExactlyOnceProjection()
}
LoggingTestKit
.custom { event =>
event.level == Level.TRACE && event.message.matches(
s"""Received event from \\[127.0.0.1] persistenceId \\[${pid.id
.replace("|", "\\|")}] with seqNr \\[[123]].*""") && event.message
.endsWith("source [BT]")
}
.withOccurrences(3)
.expect {
// start the projection
spawnExactlyOnceProjection()
}
}

processedProbe.receiveMessage().envelope.event shouldBe "A"
processedProbe.receiveMessage().envelope.event shouldBe "B"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.projection.grpc.consumer.scaladsl

import akka.Done
import akka.NotUsed
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
Expand All @@ -15,9 +14,9 @@ import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.projection.grpc.TestContainerConf
import akka.projection.grpc.TestData
import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.TestEntity
import akka.projection.grpc.TestData
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
Expand Down Expand Up @@ -122,8 +121,9 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
val env = grpcReadJournal
.loadEnvelope[String](pid.id, sequenceNr = 1L)
.futureValue
env.filtered shouldBe true
env.eventMetadata shouldBe None
env.eventOption.isEmpty shouldBe true
env.eventMetadata shouldBe Some(NotUsed)
}

"handle missing event as NOT_FOUND" in new TestFixture {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# wire protocol changes
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.<init>$default$6")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.apply$default$6")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.of")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.copy$default$6")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.<init>$default$6")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.of")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.Event.apply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.Event.apply$default$6")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.<init>$default$5")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.apply$default$5")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.of")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.copy$default$5")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.<init>$default$5")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.of")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.grpc.internal.proto.FilteredEvent.apply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.projection.grpc.internal.proto.FilteredEvent.apply$default$5")
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ message Event {
int64 seq_nr = 2;
int32 slice = 3;
Offset offset = 4;
// Actual payload and metadata serialization is deferred to Akka serialization,
// the serializer id and manifest are encoded into a custom type_url schema
google.protobuf.Any payload = 5;
string source = 6;
}

// Events that are filtered out are represented by this
Expand All @@ -70,6 +73,7 @@ message FilteredEvent {
int64 seq_nr = 2;
int32 slice = 3;
Offset offset = 4;
string source = 5;
}

message EventTimestampRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package akka.projection.grpc.consumer.scaladsl

import java.time.Instant
import java.util.concurrent.TimeUnit

import scala.collection.immutable
import scala.concurrent.Future

import akka.Done
import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
Expand Down Expand Up @@ -266,23 +264,24 @@ final class GrpcReadJournal private (
case StreamOut(StreamOut.Message.Event(event), _) =>
if (log.isTraceEnabled)
log.traceN(
"Received {}event from [{}] persistenceId [{}] with seqNr [{}], offset [{}]",
if (event.payload.isEmpty) "backtracking " else "",
"Received event from [{}] persistenceId [{}] with seqNr [{}], offset [{}], source [{}]",
clientSettings.serviceName,
event.persistenceId,
event.seqNr,
timestampOffset(event.offset.get).timestamp)
timestampOffset(event.offset.get).timestamp,
event.source)

eventToEnvelope(event, streamId)

case StreamOut(StreamOut.Message.FilteredEvent(filteredEvent), _) =>
if (log.isTraceEnabled)
log.traceN(
"Received filtered event from [{}] persistenceId [{}] with seqNr [{}], offset [{}]",
"Received filtered event from [{}] persistenceId [{}] with seqNr [{}], offset [{}], source [{}]",
clientSettings.serviceName,
filteredEvent.persistenceId,
filteredEvent.seqNr,
timestampOffset(filteredEvent.offset.get).timestamp)
timestampOffset(filteredEvent.offset.get).timestamp,
filteredEvent.source)

filteredEventToEnvelope(filteredEvent, streamId)

Expand All @@ -309,23 +308,24 @@ final class GrpcReadJournal private (
eventOffset.timestamp.toEpochMilli,
eventMetadata = None,
PersistenceId.extractEntityType(event.persistenceId),
event.slice)
event.slice,
filtered = false,
source = event.source)
}

private def filteredEventToEnvelope[Evt](filteredEvent: FilteredEvent, entityType: String): EventEnvelope[Evt] = {
val eventOffset = timestampOffset(filteredEvent.offset.get)

// Note that envelope is marked with NotUsed in the eventMetadata. That is handled by the R2dbcProjection
// implementation to skip the envelope and still store the offset.
new EventEnvelope(
eventOffset,
filteredEvent.persistenceId,
filteredEvent.seqNr,
None,
eventOffset.timestamp.toEpochMilli,
eventMetadata = Some(NotUsed),
eventMetadata = None,
entityType,
filteredEvent.slice)
filteredEvent.slice,
filtered = true,
source = filteredEvent.source)
}

private def timestampOffset(protoOffset: akka.projection.grpc.internal.proto.Offset): TimestampOffset = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,18 +160,19 @@ import scala.util.Success
transformAndEncodeEvent(producerSource.transformation, env).map {
case Some(event) =>
log.traceN(
"Emitting {}event from persistenceId [{}] with seqNr [{}], offset [{}]",
if (event.payload.isEmpty) "backtracking " else "",
"Emitting event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]",
env.persistenceId,
env.sequenceNr,
env.offset)
env.offset,
event.source)
StreamOut(StreamOut.Message.Event(event))
case None =>
log.traceN(
"Filtered event from persistenceId [{}] with seqNr [{}], offset [{}]",
"Filtered event from persistenceId [{}] with seqNr [{}], offset [{}], source [{}]",
env.persistenceId,
env.sequenceNr,
env.offset)
env.offset,
env.source)
StreamOut(
StreamOut.Message.FilteredEvent(
FilteredEvent(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)))))
Expand Down Expand Up @@ -205,6 +206,13 @@ import scala.util.Success
val mappedFuture: Future[Option[Any]] = transformation(env.asInstanceOf[EventEnvelope[Any]])
def toEvent(transformedEvent: Any): Event = {
val protoEvent = protoAnySerialization.serialize(transformedEvent)
Event(
persistenceId = env.persistenceId,
seqNr = env.sequenceNr,
slice = env.slice,
offset = Some(protoOffset(env)),
payload = Some(protoEvent),
source = env.source)
Event(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)), Some(protoEvent))
}
mappedFuture.value match {
Expand All @@ -217,7 +225,14 @@ import scala.util.Success
// Events from backtracking are lazily loaded via `loadEvent` if needed.
// Transformation and filter is done via `loadEvent` in that case.
Future.successful(
Some(Event(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)), payload = None)))
Some(
Event(
persistenceId = env.persistenceId,
seqNr = env.sequenceNr,
slice = env.slice,
offset = Some(protoOffset(env)),
payload = None,
source = env.source)))
}
}

Expand Down Expand Up @@ -271,8 +286,14 @@ import scala.util.Success
env.persistenceId,
env.sequenceNr,
env.offset)
LoadEventResponse(LoadEventResponse.Message.FilteredEvent(
FilteredEvent(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)))))
LoadEventResponse(
LoadEventResponse.Message.FilteredEvent(
FilteredEvent(
persistenceId = env.persistenceId,
seqNr = env.sequenceNr,
slice = env.slice,
offset = Some(protoOffset(env)),
source = env.source)))
}
}
.recoverWith {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {
timestamp = System.currentTimeMillis(),
entityType = "banana",
slice = 5,
eventMetadata = meta).asInstanceOf[EventEnvelope[Any]]
eventMetadata = meta,
filtered = false,
source = "").asInstanceOf[EventEnvelope[Any]]

"The gRPC event Transformation" should {

Expand Down
6 changes: 4 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ object Dependencies {
val AkkaPersistenceR2dbcVersionInDocs = Versions.akkaPersistenceR2dbc

object Versions {
val akka = sys.props.getOrElse("build.akka.version", "2.7.0")
// FIXME non-milestone
val akka = sys.props.getOrElse("build.akka.version", "2.8.0-M4")
val akkaPersistenceCassandra = "1.1.0"
val akkaPersistenceJdbc = "5.2.0"
val akkaPersistenceR2dbc = "1.0.1"
// FIXME non-milestone
val akkaPersistenceR2dbc = "1.1.0-M4"
val alpakka = "5.0.0"
val alpakkaKafka = sys.props.getOrElse("build.alpakka.kafka.version", "4.0.0")
val slick = "3.4.1"
Expand Down

0 comments on commit 630e826

Please sign in to comment.