From 445c4eae4b3c8bc98508f4088f2974d26551bd4e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 10 Jan 2025 00:27:11 +0100 Subject: [PATCH] fix: R2DBC replay rejected events (#1289) * triggerReplayIfPossible when CurrentEventsByPersistenceIdTypedQuery is not supported by underlying query * env source in replay log messages * remove misleading replay count in flow * pubsub logging always at debug --- .../internal/DynamoDBProjectionImpl.scala | 118 +-- .../R2dbcTimestampOffsetProjectionSpec.scala | 784 +++++++++++++++++- .../r2dbc/TestSourceProviderWithInput.scala | 26 +- .../src/main/resources/reference.conf | 3 + .../r2dbc/R2dbcProjectionSettings.scala | 17 +- .../r2dbc/internal/R2dbcOffsetStore.scala | 16 +- .../r2dbc/internal/R2dbcProjectionImpl.scala | 521 ++++++++++-- 7 files changed, 1365 insertions(+), 120 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 25837cfa4..88a4a4561 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -64,6 +64,7 @@ import akka.stream.scaladsl.FlowWithContext import akka.stream.scaladsl.Source import org.slf4j.Logger import org.slf4j.LoggerFactory +import org.slf4j.event.Level import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient /** @@ -508,7 +509,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -544,30 +545,19 @@ private[projection] object DynamoDBProjectionImpl { env -> ProjectionContextImpl(sourceProvider.extractOffset(env), env, null) } .via(handler.asFlow) - .runFold(0) { case (acc, _) => acc + 1 } - .map { count => - val expected = toSeqNr - fromSeqNr + 1 - if (count == expected) { - true - } else { - // FIXME: filtered envelopes are not passed through, so we can't expect all to be replayed here - // and handler could also filter out envelopes - log.debug( - "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", - logPrefix, - count, - expected, - persistenceId, - fromSeqNr, - toSeqNr) - true - } - } + .run() + .map(_ => true)(ExecutionContext.parasitic) .recoverWith { exc => - logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) + logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc) Future.failed(exc) } - case None => FutureFalse + case None => + Future.successful( + triggerReplayIfPossible( + sourceProvider, + persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr)) } } @@ -610,45 +600,61 @@ private[projection] object DynamoDBProjectionImpl { .via(handler) } - private def logReplayRejected(logPrefix: String, persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Unit = { - val msg = - "{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]." - val c = replayRejectedCounter.incrementAndGet() - if (c == 1 || c % 1000 == 0) - log.warn(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c) - else - log.debug(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c) + private def logReplayRejected(logPrefix: String, originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long): Unit = { + if (EnvelopeOrigin.fromPubSub(originalEventEnvelope)) { + log.debug( + "{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}].", + logPrefix, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr) + } else { + val c = replayRejectedCounter.incrementAndGet() + val logLevel = + if (c == 1 || c % 1000 == 0) Level.WARN else Level.DEBUG + log + .atLevel(logLevel) + .log( + "{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}].", + logPrefix, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + c) + } } private def logReplayInvalidCount( logPrefix: String, - persistenceId: String, + originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long, - toSeqNr: Long, count: Int, expected: Long): Unit = { log.warn( - "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope from {}, found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", logPrefix, + envelopeSourceName(originalEventEnvelope), count, expected, - persistenceId, + originalEventEnvelope.persistenceId, fromSeqNr, - toSeqNr) + originalEventEnvelope.sequenceNr) } private def logReplayException( logPrefix: String, - persistenceId: String, + originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long, - toSeqNr: Long, exc: Throwable): Unit = { log.warn( - "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope from {} failed. PersistenceId [{}] from seqNr [{}] to [{}].", logPrefix, - persistenceId, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, fromSeqNr, - toSeqNr, + originalEventEnvelope.sequenceNr, exc) } @@ -662,11 +668,10 @@ private[projection] object DynamoDBProjectionImpl { envelope match { case env: EventEnvelope[Any @unchecked] if env.sequenceNr > 1 => sourceProvider match { - case provider: CanTriggerReplay => + case _: CanTriggerReplay => offsetStore.storedSeqNr(env.persistenceId).map { storedSeqNr => val fromSeqNr = storedSeqNr + 1 - provider.triggerReplay(env.persistenceId, fromSeqNr, env.sequenceNr) - true + triggerReplayIfPossible(sourceProvider, env.persistenceId, fromSeqNr, env.sequenceNr) } case _ => FutureFalse // no replay support for other source providers @@ -676,6 +681,23 @@ private[projection] object DynamoDBProjectionImpl { } } + /** + * This replay mechanism is used by GrpcReadJournal + */ + private def triggerReplayIfPossible[Offset, Envelope]( + sourceProvider: SourceProvider[Offset, Envelope], + persistenceId: String, + fromSeqNr: Long, + triggeredBySeqNr: Long): Boolean = { + sourceProvider match { + case provider: CanTriggerReplay => + provider.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr) + true + case _ => + false // no replay support for other source providers + } + } + private def replayIfPossible[Offset, Envelope]( offsetStore: DynamoDBOffsetStore, sourceProvider: SourceProvider[Offset, Envelope], @@ -696,7 +718,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -726,15 +748,17 @@ private[projection] object DynamoDBProjectionImpl { true } else { // it's expected to find all events, otherwise fail the replay attempt - logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected) + logReplayInvalidCount(logPrefix, originalEventEnvelope, fromSeqNr, count, expected) throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) } } .recoverWith { exc => - logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) + logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc) Future.failed(exc) } - case None => FutureFalse + case None => + Future.successful( + triggerReplayIfPossible(sourceProvider, persistenceId, fromSeqNr, originalEventEnvelope.sequenceNr)) } } diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala index 60f570929..4812d8a37 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala @@ -6,15 +6,25 @@ package akka.projection.r2dbc import java.time.Instant import java.time.{ Duration => JDuration } +import java.util.Optional import java.util.UUID +import java.util.concurrent.CompletionStage +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger +import java.util.function.Supplier + import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ + import akka.Done import akka.NotUsed import akka.actor.testkit.typed.TestException @@ -36,6 +46,7 @@ import akka.projection.ProjectionId import akka.projection.TestStatusObserver import akka.projection.TestStatusObserver.Err import akka.projection.TestStatusObserver.OffsetProgress +import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider import akka.projection.r2dbc.internal.R2dbcOffsetStore import akka.projection.r2dbc.scaladsl.R2dbcHandler import akka.projection.r2dbc.scaladsl.R2dbcProjection @@ -81,11 +92,13 @@ object R2dbcTimestampOffsetProjectionSpec { class TestTimestampSourceProvider( envelopes: immutable.IndexedSeq[EventEnvelope[String]], testSourceProvider: TestSourceProvider[TimestampOffset, EventEnvelope[String]], - override val maxSlice: Int) + override val maxSlice: Int, + enableCurrentEventsByPersistenceId: Boolean) extends SourceProvider[TimestampOffset, EventEnvelope[String]] with BySlicesSourceProvider with EventTimestampQuery - with LoadEventQuery { + with LoadEventQuery + with LoadEventsByPersistenceIdSourceProvider[String] { override def source(offset: () => Future[Option[TimestampOffset]]): Future[Source[EventEnvelope[String], NotUsed]] = testSourceProvider.source(offset) @@ -119,6 +132,80 @@ object R2dbcTimestampOffsetProjectionSpec { s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found.")) } } + + override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = { + if (enableCurrentEventsByPersistenceId) + Some(Source(envelopes.filter { env => + env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr + })) + else + None + } + } + + class JavaTestTimestampSourceProvider( + envelopes: immutable.IndexedSeq[EventEnvelope[String]], + testSourceProvider: akka.projection.testkit.javadsl.TestSourceProvider[TimestampOffset, EventEnvelope[String]], + override val maxSlice: Int, + enableCurrentEventsByPersistenceId: Boolean) + extends akka.projection.javadsl.SourceProvider[TimestampOffset, EventEnvelope[String]] + with BySlicesSourceProvider + with akka.persistence.query.typed.javadsl.EventTimestampQuery + with akka.persistence.query.typed.javadsl.LoadEventQuery + with LoadEventsByPersistenceIdSourceProvider[String] { + + override def source(offset: Supplier[CompletionStage[Optional[TimestampOffset]]]) + : CompletionStage[akka.stream.javadsl.Source[EventEnvelope[String], NotUsed]] = + testSourceProvider.source(offset) + + override def extractOffset(envelope: EventEnvelope[String]): TimestampOffset = + testSourceProvider.extractOffset(envelope) + + override def extractCreationTime(envelope: EventEnvelope[String]): Long = + testSourceProvider.extractCreationTime(envelope) + + override def minSlice: Int = 0 + + override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] = { + Future + .successful(envelopes.collectFirst { + case env + if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr && env.offset + .isInstanceOf[TimestampOffset] => + env.offset.asInstanceOf[TimestampOffset].timestamp + }.toJava) + .asJava + } + + override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] = { + envelopes.collectFirst { + case env if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr => + env.asInstanceOf[EventEnvelope[Event]] + } match { + case Some(env) => Future.successful(env).asJava + case None => + Future + .failed( + new NoSuchElementException( + s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found.")) + .asJava + } + } + + override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = { + if (enableCurrentEventsByPersistenceId) + Some(Source(envelopes.filter { env => + env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr + })) + else + None + } } } @@ -160,7 +247,18 @@ class R2dbcTimestampOffsetProjectionSpec def createSourceProvider( envelopes: immutable.IndexedSeq[EventEnvelope[String]], + enableCurrentEventsByPersistenceId: Boolean = false, complete: Boolean = true): TestTimestampSourceProvider = { + createSourceProviderWithMoreEnvelopes(envelopes, envelopes, enableCurrentEventsByPersistenceId, complete) + } + + // envelopes are emitted by the "query" source, but allEnvelopes can be loaded + def createSourceProviderWithMoreEnvelopes( + envelopes: immutable.IndexedSeq[EventEnvelope[String]], + allEnvelopes: immutable.IndexedSeq[EventEnvelope[String]], + enableCurrentEventsByPersistenceId: Boolean, + complete: Boolean = true): TestTimestampSourceProvider = { + val sp = TestSourceProvider[TimestampOffset, EventEnvelope[String]]( Source(envelopes), _.offset.asInstanceOf[TimestampOffset]) @@ -170,7 +268,36 @@ class R2dbcTimestampOffsetProjectionSpec } .withAllowCompletion(complete) - new TestTimestampSourceProvider(envelopes, sp, persistenceExt.numberOfSlices - 1) + new TestTimestampSourceProvider( + allEnvelopes, + sp, + persistenceExt.numberOfSlices - 1, + enableCurrentEventsByPersistenceId) + + } + + // envelopes are emitted by the "query" source, but allEnvelopes can be loaded + def createJavaSourceProviderWithMoreEnvelopes( + envelopes: immutable.IndexedSeq[EventEnvelope[String]], + allEnvelopes: immutable.IndexedSeq[EventEnvelope[String]], + enableCurrentEventsByPersistenceId: Boolean, + complete: Boolean = true): JavaTestTimestampSourceProvider = { + val sp = + akka.projection.testkit.javadsl.TestSourceProvider + .create[TimestampOffset, EventEnvelope[String]]( + akka.stream.javadsl.Source.from(envelopes.asJava), + _.offset.asInstanceOf[TimestampOffset]) + .withStartSourceFrom { (lastProcessedOffset, offset) => + offset.timestamp.isBefore(lastProcessedOffset.timestamp) || + (offset.timestamp == lastProcessedOffset.timestamp && offset.seen == lastProcessedOffset.seen) + } + .withAllowCompletion(complete) + + new JavaTestTimestampSourceProvider( + allEnvelopes, + sp, + persistenceExt.numberOfSlices - 1, + enableCurrentEventsByPersistenceId) } def createBacktrackingSourceProvider( @@ -181,7 +308,11 @@ class R2dbcTimestampOffsetProjectionSpec _.offset.asInstanceOf[TimestampOffset]) .withStartSourceFrom { (_, _) => false } // include all .withAllowCompletion(complete) - new TestTimestampSourceProvider(envelopes, sp, persistenceExt.numberOfSlices - 1) + new TestTimestampSourceProvider( + envelopes, + sp, + persistenceExt.numberOfSlices - 1, + enableCurrentEventsByPersistenceId = false) } private def offsetShouldBe[Offset](expected: Offset)(implicit offsetStore: R2dbcOffsetStore) = { @@ -598,6 +729,102 @@ class R2dbcTimestampOffsetProjectionSpec offsetShouldBe(envelopes.last.offset) } + "replay rejected sequence numbers" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val projectionRef = spawn( + ProjectionBehavior( + R2dbcProjection + .exactlyOnce( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new ConcatHandler))) + + eventually { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1) + projectedValueShouldBe("e1|e2|e3")(pid2) + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + projectionRef ! ProjectionBehavior.Stop + } + + "replay rejected sequence numbers due to clock skew on event write" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val projectionRef = spawn( + ProjectionBehavior( + R2dbcProjection + .exactlyOnce( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new ConcatHandler))) + + eventually { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1) + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid2) + } + + eventually { + offsetShouldBe(envelopes.last.offset) + } + + projectionRef ! ProjectionBehavior.Stop + } + } "A R2DBC grouped projection with TimestampOffset" must { @@ -732,6 +959,107 @@ class R2dbcTimestampOffsetProjectionSpec offsetShouldBe(envelopes.last.offset) } + "replay rejected sequence numbers for exactly-once grouped" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 10) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L, 7L, 9L) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val handlerProbe = createTestProbe[String]("calls-to-handler") + + val projectionRef = spawn( + ProjectionBehavior( + R2dbcProjection + .groupedWithin( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => groupedHandler(handlerProbe.ref)) + .withGroup(8, 3.seconds))) + + eventually { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9|e10")(pid1) + projectedValueShouldBe("e1|e2|e3")(pid2) + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + projectionRef ! ProjectionBehavior.Stop + } + + "replay rejected sequence numbers due to clock skew on event write for exactly-once grouped" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val handlerProbe = createTestProbe[String]("calls-to-handler") + + val projectionRef = spawn( + ProjectionBehavior( + R2dbcProjection + .groupedWithin( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => groupedHandler(handlerProbe.ref)) + .withGroup(8, 3.seconds))) + + eventually { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1) + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid2) + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + projectionRef ! ProjectionBehavior.Stop + } + "handle grouped async projection" in { val pid1 = UUID.randomUUID().toString val pid2 = UUID.randomUUID().toString @@ -902,6 +1230,250 @@ class R2dbcTimestampOffsetProjectionSpec offsetShouldBe(envelopes.last.offset) } } + + "replay rejected sequence numbers for at-least-once grouped" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 10) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L, 7L, 9L) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val results = new ConcurrentHashMap[String, String]() + + val handler: Handler[Seq[EventEnvelope[String]]] = + (envelopes: Seq[EventEnvelope[String]]) => { + Future { + envelopes.foreach { envelope => + results.putIfAbsent(envelope.persistenceId, "|") + results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|") + } + }.map(_ => Done) + } + + val projection = + R2dbcProjection + .groupedWithinAsync( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => handler) + .withGroup(8, 3.seconds) + + offsetShouldBeEmpty() + + projectionTestKit.run(projection) { + results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|e10|" + results.get(pid2) shouldBe "|e1|e2|e3|" + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + } + + "replay rejected sequence numbers due to clock skew on event write for at-least-once grouped" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val results = new ConcurrentHashMap[String, String]() + + val handler: Handler[Seq[EventEnvelope[String]]] = + (envelopes: Seq[EventEnvelope[String]]) => { + Future { + envelopes.foreach { envelope => + results.putIfAbsent(envelope.persistenceId, "|") + results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|") + } + }.map(_ => Done) + } + + val projection = + R2dbcProjection + .groupedWithinAsync( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => handler) + .withGroup(2, 3.seconds) + + offsetShouldBeEmpty() + + projectionTestKit.run(projection) { + results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|" + results.get(pid2) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|" + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + } + + "replay rejected sequence numbers for at-least-once grouped (javadsl)" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 10) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L, 7L, 9L) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createJavaSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val results = new ConcurrentHashMap[String, String]() + + val handler: akka.projection.javadsl.Handler[java.util.List[EventEnvelope[String]]] = + (envelopes: java.util.List[EventEnvelope[String]]) => { + Future { + envelopes.asScala.foreach { envelope => + results.putIfAbsent(envelope.persistenceId, "|") + results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|") + } + }.map(_ => Done.getInstance()).asJava + } + + val projection = + akka.projection.r2dbc.javadsl.R2dbcProjection + .groupedWithinAsync( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)).toJava, + sourceProvider, + handler = () => handler, + system) + .withGroup(8, 3.seconds.toJava) + + offsetShouldBeEmpty() + + projectionTestKit.run(projection) { + results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|e10|" + results.get(pid2) shouldBe "|e1|e2|e3|" + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + } + + "replay rejected sequence numbers due to clock skew on event write for at-least-once grouped (javadsl)" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createJavaSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val results = new ConcurrentHashMap[String, String]() + + val handler: akka.projection.javadsl.Handler[java.util.List[EventEnvelope[String]]] = + (envelopes: java.util.List[EventEnvelope[String]]) => { + Future { + envelopes.asScala.foreach { envelope => + results.putIfAbsent(envelope.persistenceId, "|") + results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|") + } + }.map(_ => Done.getInstance()).asJava + } + + val projection = + akka.projection.r2dbc.javadsl.R2dbcProjection + .groupedWithinAsync( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)).toJava, + sourceProvider, + handler = () => handler, + system) + .withGroup(2, 3.seconds.toJava) + + offsetShouldBeEmpty() + + projectionTestKit.run(projection) { + results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|" + results.get(pid2) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|" + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + } } "A R2DBC at-least-once projection with TimestampOffset" must { @@ -1229,6 +1801,102 @@ class R2dbcTimestampOffsetProjectionSpec offsetShouldBe(envelopes.last.offset) } } + + "replay rejected sequence numbers" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val projectionRef = spawn( + ProjectionBehavior( + R2dbcProjection + .atLeastOnce( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new ConcatHandler))) + + eventually { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1) + projectedValueShouldBe("e1|e2|e3")(pid2) + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + projectionRef ! ProjectionBehavior.Stop + } + + "replay rejected sequence numbers due to clock skew on event write" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + val projectionRef = spawn( + ProjectionBehavior( + R2dbcProjection + .atLeastOnce( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = () => new ConcatHandler))) + + eventually { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1) + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid2) + } + + eventually { + offsetShouldBe(envelopes.last.offset) + } + + projectionRef ! ProjectionBehavior.Stop + } } "A R2DBC flow projection with TimestampOffset" must { @@ -1374,6 +2042,114 @@ class R2dbcTimestampOffsetProjectionSpec offsetShouldBe(envelopes.last.offset) } } + + "replay rejected sequence numbers for flow projection" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + offsetShouldBeEmpty() + + val flowHandler = + FlowWithContext[EventEnvelope[String], ProjectionContext] + .mapAsync(1) { env => + withRepo(_.concatToText(env.persistenceId, env.event)) + } + + val projection = + R2dbcProjection + .atLeastOnceFlow( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = flowHandler) + .withSaveOffset(1, 1.minute) + + projectionTestKit.run(projection) { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1) + projectedValueShouldBe("e1|e2|e3")(pid2) + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + } + + "replay rejected sequence numbers due to clock skew for flow projection" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + val sourceProvider = + createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: R2dbcOffsetStore = + new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + + offsetShouldBeEmpty() + + val flowHandler = + FlowWithContext[EventEnvelope[String], ProjectionContext] + .mapAsync(1) { env => + withRepo(_.concatToText(env.persistenceId, env.event)) + } + + val projection = + R2dbcProjection + .atLeastOnceFlow( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)), + sourceProvider, + handler = flowHandler) + .withSaveOffset(1, 1.minute) + + projectionTestKit.run(projection) { + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1) + projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid2) + } + + eventually { + offsetShouldBe(allEnvelopes.last.offset) + } + } } "R2dbcProjection management with TimestampOffset" must { diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestSourceProviderWithInput.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestSourceProviderWithInput.scala index 6046a419c..dd8f7ccdd 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestSourceProviderWithInput.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/TestSourceProviderWithInput.scala @@ -23,15 +23,19 @@ import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.EventTimestampQuery import akka.persistence.query.typed.scaladsl.LoadEventQuery import akka.projection.BySlicesSourceProvider +import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider import akka.projection.scaladsl.SourceProvider import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source -class TestSourceProviderWithInput()(implicit val system: ActorSystem[_]) +class TestSourceProviderWithInput(enableCurrentEventsByPersistenceId: Boolean)(implicit val system: ActorSystem[_]) extends SourceProvider[TimestampOffset, EventEnvelope[String]] with BySlicesSourceProvider with EventTimestampQuery - with LoadEventQuery { + with LoadEventQuery + with LoadEventsByPersistenceIdSourceProvider[String] { + + def this()(implicit system: ActorSystem[_]) = this(enableCurrentEventsByPersistenceId = false) private implicit val ec: ExecutionContext = system.executionContext private val persistenceExt = Persistence(system) @@ -96,4 +100,22 @@ class TestSourceProviderWithInput()(implicit val system: ActorSystem[_]) s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found.")) } } + + override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = { + if (enableCurrentEventsByPersistenceId) + Some( + Source( + envelopes + .iterator() + .asScala + .filter { env => + env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr + } + .toVector)) + else + None + } } diff --git a/akka-projection-r2dbc/src/main/resources/reference.conf b/akka-projection-r2dbc/src/main/resources/reference.conf index 6b0490f49..40f0186a8 100644 --- a/akka-projection-r2dbc/src/main/resources/reference.conf +++ b/akka-projection-r2dbc/src/main/resources/reference.conf @@ -50,6 +50,9 @@ akka.projection.r2dbc { offset-slice-read-limit = 100 } + # Replay missed events for a particular persistence id when a sequence number is rejected by validation. + replay-on-rejected-sequence-numbers = on + # By default it shares connection-factory with akka-persistence-r2dbc (write side), # i.e. same connection pool. To use a separate pool for projections this can be # set to another config path that defines the config based on one of the supported diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala index 8a36355cd..ebbc981b5 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala @@ -83,7 +83,8 @@ object R2dbcProjectionSettings { offsetBatchSize = config.getInt("offset-store.offset-batch-size"), customConnectionFactory = None, offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"), - offsetSliceReadLimit = config.getInt("offset-store.offset-slice-read-limit")) + offsetSliceReadLimit = config.getInt("offset-store.offset-slice-read-limit"), + replayOnRejectedSequenceNumbers = config.getBoolean("replay-on-rejected-sequence-numbers")) } /** @@ -114,7 +115,8 @@ final class R2dbcProjectionSettings private ( val offsetBatchSize: Int, val customConnectionFactory: Option[ConnectionFactory], val offsetSliceReadParallelism: Int, - val offsetSliceReadLimit: Int) { + val offsetSliceReadLimit: Int, + val replayOnRejectedSequenceNumbers: Boolean) { val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + offsetTable val timestampOffsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + timestampOffsetTable @@ -198,6 +200,9 @@ final class R2dbcProjectionSettings private ( def withOffsetSliceReadLimit(offsetSliceReadLimit: Int): R2dbcProjectionSettings = copy(offsetSliceReadLimit = offsetSliceReadLimit) + def withReplayOnRejectedSequenceNumbers(replayOnRejectedSequenceNumbers: Boolean): R2dbcProjectionSettings = + copy(replayOnRejectedSequenceNumbers = replayOnRejectedSequenceNumbers) + @nowarn("msg=deprecated") private def copy( schema: Option[String] = schema, @@ -215,7 +220,8 @@ final class R2dbcProjectionSettings private ( offsetBatchSize: Int = offsetBatchSize, customConnectionFactory: Option[ConnectionFactory] = customConnectionFactory, offsetSliceReadParallelism: Int = offsetSliceReadParallelism, - offsetSliceReadLimit: Int = offsetSliceReadLimit) = + offsetSliceReadLimit: Int = offsetSliceReadLimit, + replayOnRejectedSequenceNumbers: Boolean = replayOnRejectedSequenceNumbers) = new R2dbcProjectionSettings( schema, offsetTable, @@ -234,8 +240,9 @@ final class R2dbcProjectionSettings private ( offsetBatchSize, customConnectionFactory, offsetSliceReadParallelism, - offsetSliceReadLimit) + offsetSliceReadLimit, + replayOnRejectedSequenceNumbers) override def toString = - s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize, $customConnectionFactory)" + s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize, $customConnectionFactory, &offsetSliceReadParallelism, $offsetSliceReadLimit, $replayOnRejectedSequenceNumbers)" } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index 579576093..cf145560c 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -217,7 +217,7 @@ private[projection] class R2dbcOffsetStore( projectionId: ProjectionId, sourceProvider: Option[BySlicesSourceProvider], system: ActorSystem[_], - settings: R2dbcProjectionSettings, + val settings: R2dbcProjectionSettings, r2dbcExecutor: R2dbcExecutor, clock: Clock = Clock.systemUTC()) { @@ -233,7 +233,7 @@ private[projection] class R2dbcOffsetStore( } private val logger = LoggerFactory.getLogger(this.getClass) - private val logPrefix = s"${projectionId.name} [$minSlice-$maxSlice]:" + val logPrefix = s"${projectionId.name} [$minSlice-$maxSlice]:" private val offsetSerialization = new OffsetSerialization(system) import offsetSerialization.fromStorageRepresentation @@ -774,9 +774,14 @@ private[projection] class R2dbcOffsetStore( // snapshots will mean we are starting from some arbitrary offset after last seen offset FutureAccepted } else if (!recordWithOffset.fromBacktracking) { + // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled + // and SourceProvider supports it. logUnexpected() FutureRejectedSeqNr } else { + // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled + // and SourceProvider supports it. + // Otherwise this will result in projection restart (with normal configuration). logUnexpected() // This will result in projection restart (with normal configuration) FutureRejectedBacktrackingSeqNr @@ -834,6 +839,8 @@ private[projection] class R2dbcOffsetStore( settings.backtrackingWindow) Accepted } else if (recordWithOffset.fromPubSub) { + // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled + // and SourceProvider supports it. logger.debug( "{} Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}", logPrefix, @@ -842,7 +849,8 @@ private[projection] class R2dbcOffsetStore( recordWithOffset.offset) RejectedSeqNr } else if (recordWithOffset.fromBacktracking) { - // This will result in projection restart (with normal configuration) + // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled + // and SourceProvider supports it. logger.warn( "{} Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " + "is after start timestamp [{}] minus backtracking window [{}].", @@ -862,6 +870,8 @@ private[projection] class R2dbcOffsetStore( seqNr, pid, recordWithOffset.offset) + // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled + // and SourceProvider supports it. // Backtracking will emit missed event again. RejectedSeqNr } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala index cadf8a1a1..9e0514c80 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -23,6 +23,7 @@ import akka.persistence.query.DurableStateChange import akka.persistence.query.UpdatedDurableState import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.LoadEventQuery +import akka.persistence.r2dbc.internal.EnvelopeOrigin import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.state.scaladsl.DurableStateStore import akka.persistence.state.scaladsl.GetObjectResult @@ -36,6 +37,7 @@ import akka.projection.RunningProjection import akka.projection.RunningProjection.AbortProjectionException import akka.projection.RunningProjectionManagement import akka.projection.StatusObserver +import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider import akka.projection.internal.ActorHandlerInit import akka.projection.internal.AtLeastOnce import akka.projection.internal.AtMostOnce @@ -45,6 +47,7 @@ import akka.projection.internal.GroupedHandlerStrategy import akka.projection.internal.HandlerStrategy import akka.projection.internal.InternalProjection import akka.projection.internal.InternalProjectionState +import akka.projection.internal.JavaToScalaBySliceSourceProviderAdapter import akka.projection.internal.ManagementState import akka.projection.internal.OffsetStoredByHandler import akka.projection.internal.OffsetStrategy @@ -65,6 +68,7 @@ import akka.stream.scaladsl.FlowWithContext import akka.stream.scaladsl.Source import org.slf4j.Logger import org.slf4j.LoggerFactory +import org.slf4j.event.Level /** * INTERNAL API @@ -88,6 +92,7 @@ private[projection] object R2dbcProjectionImpl { } private val loadEnvelopeCounter = new AtomicLong + private val replayRejectedCounter = new AtomicLong def loadEnvelope[Envelope](env: Envelope, sourceProvider: SourceProvider[_, Envelope])( implicit @@ -206,15 +211,33 @@ private[projection] object R2dbcProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)( - ExecutionContext.parasitic) + replay(envelope).map(_ => Done)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { + replay(envelope).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, envelope) } } } + + private def replay(originalEnvelope: Envelope): Future[Boolean] = { + replayIfPossible(offsetStore, sourceProvider, originalEnvelope) { envelope => + val offset = extractOffsetPidSeqNr(sourceProvider, envelope.asInstanceOf[Envelope]) + if (isFilteredEvent(envelope)) { + offsetStore.saveOffset(offset) + } else { + r2dbcExecutor.withConnection("exactly-once handler") { conn => + // run users handler + val session = new R2dbcSession(conn) + delegate + .process(session, envelope.asInstanceOf[Envelope]) + .flatMap { _ => + offsetStore.saveOffsetInTx(conn, offset) + } + } + } + } + } } } @@ -231,30 +254,32 @@ private[projection] object R2dbcProjectionImpl { override def process(envelopes: Seq[Envelope]): Future[Done] = { import R2dbcOffsetStore.Validation._ offsetStore.validateAll(envelopes).flatMap { isAcceptedEnvelopes => + // For simplicity we process the replayed envelopes one by one (group of 1), and also store the + // offset for each separately. + // Important to replay them sequentially to avoid concurrent processing and offset storage. val replayDone = - Future.sequence(isAcceptedEnvelopes.map { - case (env, RejectedSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map(_ => Done)(ExecutionContext.parasitic) - case (env, RejectedBacktrackingSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map { - case true => Done - case false => throwRejectedEnvelope(sourceProvider, env) + isAcceptedEnvelopes.foldLeft(FutureDone) { + case (previous, (env, RejectedSeqNr)) => + previous.flatMap { _ => + replay(env).map(_ => Done)(ExecutionContext.parasitic) } - case _ => - FutureDone - }) - - replayDone.flatMap { _ => - val acceptedEnvelopes = isAcceptedEnvelopes.collect { - case (env, Accepted) => - env + case (previous, (env, RejectedBacktrackingSeqNr)) => + previous.flatMap { _ => + replay(env).map { + case true => Done + case false => throwRejectedEnvelope(sourceProvider, env) + } + } + case (previous, _) => + previous } - if (acceptedEnvelopes.isEmpty) { - FutureDone - } else { - Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { - loadedEnvelopes => + replayDone.flatMap { _ => + def processAcceptedEnvelopes(envelopes: Seq[Envelope]): Future[Done] = { + if (envelopes.isEmpty) { + FutureDone + } else { + Future.sequence(envelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { loadedEnvelopes => val offsets = loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector val filteredEnvelopes = loadedEnvelopes.filterNot(isFilteredEvent) if (filteredEnvelopes.isEmpty) { @@ -268,12 +293,54 @@ private[projection] object R2dbcProjectionImpl { } } } + } + } + } + + val acceptedEnvelopes = isAcceptedEnvelopes.collect { + case (env, Accepted) => + env + } + val hasRejected = + isAcceptedEnvelopes.exists { + case (_, RejectedSeqNr) => true + case (_, RejectedBacktrackingSeqNr) => true + case _ => false } + + if (hasRejected) { + // need second validation after replay to remove duplicates + offsetStore + .validateAll(acceptedEnvelopes) + .flatMap { isAcceptedEnvelopes2 => + val acceptedEnvelopes2 = isAcceptedEnvelopes2.collect { + case (env, Accepted) => env + } + processAcceptedEnvelopes(acceptedEnvelopes2) + } + } else { + processAcceptedEnvelopes(acceptedEnvelopes) } } } } + private def replay(originalEnvelope: Envelope): Future[Boolean] = { + replayIfPossible(offsetStore, sourceProvider, originalEnvelope) { envelope => + val offset = extractOffsetPidSeqNr(sourceProvider, envelope.asInstanceOf[Envelope]) + if (isFilteredEvent(envelope)) { + offsetStore.saveOffset(offset) + } else { + r2dbcExecutor.withConnection("grouped handler") { conn => + // run users handler + val session = new R2dbcSession(conn) + delegate.process(session, Seq(envelope.asInstanceOf[Envelope])).flatMap { _ => + offsetStore.saveOffsetInTx(conn, offset) + } + } + } + } + } } } @@ -310,16 +377,36 @@ private[projection] object R2dbcProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)( - ExecutionContext.parasitic) + replay(envelope).map(_ => Done)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { + replay(envelope).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, envelope) } } } + + private def replay(originalEnvelope: Envelope): Future[Boolean] = { + replayIfPossible(offsetStore, sourceProvider, originalEnvelope) { envelope => + if (isFilteredEvent(envelope)) { + offsetStore.addInflight(envelope) + FutureDone + } else { + r2dbcExecutor + .withConnection("at-least-once handler") { conn => + // run users handler + val session = new R2dbcSession(conn) + delegate.process(session, envelope.asInstanceOf[Envelope]) + } + .map { _ => + offsetStore.addInflight(envelope) + Done + } + } + } + } } + } private[projection] def adaptedHandlerForAtLeastOnceAsync[Offset, Envelope]( @@ -350,15 +437,30 @@ private[projection] object R2dbcProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)( - ExecutionContext.parasitic) + replay(envelope).map(_ => Done)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { + replay(envelope).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, envelope) } } } + + private def replay(originalEnvelope: Envelope): Future[Boolean] = { + replayIfPossible(offsetStore, sourceProvider, originalEnvelope) { envelope => + if (isFilteredEvent(envelope)) { + offsetStore.addInflight(envelope) + FutureDone + } else { + delegate + .process(envelope.asInstanceOf[Envelope]) + .map { _ => + offsetStore.addInflight(envelope) + Done + } + } + } + } } } @@ -371,42 +473,93 @@ private[projection] object R2dbcProjectionImpl { system: ActorSystem[_]): () => Handler[immutable.Seq[Envelope]] = { () => new AdaptedHandler(handlerFactory()) { - override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = { + override def process(envelopes: Seq[Envelope]): Future[Done] = { import R2dbcOffsetStore.Validation._ offsetStore.validateAll(envelopes).flatMap { isAcceptedEnvelopes => - isAcceptedEnvelopes.foreach { - case (env, RejectedSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map(_ => Done)(ExecutionContext.parasitic) - case (env, RejectedBacktrackingSeqNr) => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map { - case true => Done - case false => throwRejectedEnvelope(sourceProvider, env) - } - case _ => - } + // For simplicity we process the replayed envelopes one by one (group of 1), and also store the + // offset for each separately. + // Important to replay them sequentially to avoid concurrent processing and offset storage. + val replayDone = + isAcceptedEnvelopes.foldLeft(FutureDone) { + case (previous, (env, RejectedSeqNr)) => + previous.flatMap { _ => + replay(env).map(_ => Done)(ExecutionContext.parasitic) + } + case (previous, (env, RejectedBacktrackingSeqNr)) => + previous.flatMap { _ => + replay(env).map { + case true => Done + case false => throwRejectedEnvelope(sourceProvider, env) + } + } + case (previous, _) => + previous + } - val acceptedEnvelopes = isAcceptedEnvelopes.collect { - case (env, Accepted) => env - } + replayDone.flatMap { _ => - if (acceptedEnvelopes.isEmpty) { - FutureDone - } else { - Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { - loadedEnvelopes => - val offsets = loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector - val filteredEnvelopes = loadedEnvelopes.filterNot(isFilteredEvent) - if (filteredEnvelopes.isEmpty) { - offsetStore.saveOffsets(offsets) - } else { - delegate.process(filteredEnvelopes).flatMap { _ => + def processAcceptedEnvelopes(envelopes: Seq[Envelope]): Future[Done] = { + if (envelopes.isEmpty) { + FutureDone + } else { + Future.sequence(envelopes.map(env => loadEnvelope(env, sourceProvider))).flatMap { loadedEnvelopes => + val offsets = loadedEnvelopes.iterator.map(extractOffsetPidSeqNr(sourceProvider, _)).toVector + val filteredEnvelopes = loadedEnvelopes.filterNot(isFilteredEvent) + if (filteredEnvelopes.isEmpty) { offsetStore.saveOffsets(offsets) + } else { + delegate.process(filteredEnvelopes).flatMap { _ => + offsetStore.saveOffsets(offsets) + } + } + } + } + } + + val acceptedEnvelopes = isAcceptedEnvelopes.collect { + case (env, Accepted) => + env + } + val hasRejected = + isAcceptedEnvelopes.exists { + case (_, RejectedSeqNr) => true + case (_, RejectedBacktrackingSeqNr) => true + case _ => false + } + + if (hasRejected) { + // need second validation after replay to remove duplicates + offsetStore + .validateAll(acceptedEnvelopes) + .flatMap { isAcceptedEnvelopes2 => + val acceptedEnvelopes2 = isAcceptedEnvelopes2.collect { + case (env, Accepted) => env } + processAcceptedEnvelopes(acceptedEnvelopes2) } + } else { + processAcceptedEnvelopes(acceptedEnvelopes) } + } } } + + private def replay(originalEnvelope: Envelope): Future[Boolean] = { + replayIfPossible(offsetStore, sourceProvider, originalEnvelope) { envelope => + val offset = extractOffsetPidSeqNr(sourceProvider, envelope.asInstanceOf[Envelope]) + if (isFilteredEvent(envelope)) { + offsetStore.saveOffset(offset) + } else { + delegate + .process(Seq(envelope.asInstanceOf[Envelope])) + .flatMap { _ => + offsetStore.saveOffset(offset) + } + } + } + } + } } @@ -418,6 +571,84 @@ private[projection] object R2dbcProjectionImpl { implicit system: ActorSystem[_]): FlowWithContext[Envelope, ProjectionContext, Done, ProjectionContext, _] = { import R2dbcOffsetStore.Validation._ implicit val ec: ExecutionContext = system.executionContext + + // This is similar to DynamoDBProjectionImpl.replayIfPossible but difficult to extract common parts + // since this is flow processing + def replayIfPossible(originalEnvelope: Envelope): Future[Boolean] = { + val logPrefix = offsetStore.logPrefix + originalEnvelope match { + case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => + val underlyingProvider = sourceProvider match { + case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate + case provider => provider + } + underlyingProvider match { + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] + if offsetStore.settings.replayOnRejectedSequenceNumbers => + val persistenceId = originalEventEnvelope.persistenceId + offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + val toSeqNr = originalEventEnvelope.sequenceNr + logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr) + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { + case Some(querySource) => + querySource + .mapAsync(1) { envelope => + import R2dbcOffsetStore.Validation._ + offsetStore + .validate(envelope) + .map { + case Accepted => + if (isFilteredEvent(envelope) && settings.warnAboutFilteredEventsInFlow) { + log.info( + "atLeastOnceFlow doesn't support skipping envelopes. Envelope [{}] still emitted.", + envelope) + } + offsetStore.addInflight(envelope) + Some(envelope.asInstanceOf[Envelope]) + case Duplicate => + None + case RejectedSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + case RejectedBacktrackingSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + } + } + .collect { + case Some(env) => + // FIXME: should we supply a projection context? + // FIXME: add projection telemetry to all replays? (with a new envelope source?) + env -> ProjectionContextImpl(sourceProvider.extractOffset(env), env, null) + } + .via(handler.asFlow) + .run() + .map(_ => true)(ExecutionContext.parasitic) + .recoverWith { exc => + logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc) + Future.failed(exc) + } + case None => + Future.successful( + triggerReplayIfPossible( + sourceProvider, + persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr)) + } + } + + case _ => + triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + } + case _ => + FutureFalse // no replay support for non typed envelopes + } + } + FlowWithContext[Envelope, ProjectionContext] .mapAsync(1) { env => offsetStore @@ -434,9 +665,9 @@ private[projection] object R2dbcProjectionImpl { case Duplicate => Future.successful(None) case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map(_ => None)(ExecutionContext.parasitic) + replayIfPossible(env).map(_ => None)(ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, env).map { + replayIfPossible(env).map { case true => None case false => throwRejectedEnvelope(sourceProvider, env) } @@ -449,6 +680,64 @@ private[projection] object R2dbcProjectionImpl { .via(handler) } + private def logReplayRejected(logPrefix: String, originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long): Unit = { + if (EnvelopeOrigin.fromPubSub(originalEventEnvelope)) { + log.debug( + "{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}].", + logPrefix, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr) + } else { + val c = replayRejectedCounter.incrementAndGet() + val logLevel = + if (c == 1 || c % 1000 == 0) Level.WARN else Level.DEBUG + log + .atLevel(logLevel) + .log( + "{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}].", + logPrefix, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + c) + } + } + + private def logReplayInvalidCount( + logPrefix: String, + originalEventEnvelope: EventEnvelope[Any], + fromSeqNr: Long, + count: Int, + expected: Long): Unit = { + log.warn( + "{} Replay due to rejected envelope from {}, found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + logPrefix, + envelopeSourceName(originalEventEnvelope), + count, + expected, + originalEventEnvelope.persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr) + } + + private def logReplayException( + logPrefix: String, + originalEventEnvelope: EventEnvelope[Any], + fromSeqNr: Long, + exc: Throwable): Unit = { + log.warn( + "{} Replay due to rejected envelope from {} failed. PersistenceId [{}] from seqNr [{}] to [{}].", + logPrefix, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + exc) + } + /** * This replay mechanism is used by GrpcReadJournal */ @@ -459,11 +748,10 @@ private[projection] object R2dbcProjectionImpl { envelope match { case env: EventEnvelope[Any @unchecked] if env.sequenceNr > 1 => sourceProvider match { - case provider: CanTriggerReplay => + case _: CanTriggerReplay => offsetStore.storedSeqNr(env.persistenceId).map { storedSeqNr => val fromSeqNr = storedSeqNr + 1 - provider.triggerReplay(env.persistenceId, fromSeqNr, env.sequenceNr) - true + triggerReplayIfPossible(sourceProvider, env.persistenceId, fromSeqNr, env.sequenceNr) } case _ => FutureFalse // no replay support for other source providers @@ -473,6 +761,95 @@ private[projection] object R2dbcProjectionImpl { } } + /** + * This replay mechanism is used by GrpcReadJournal + */ + private def triggerReplayIfPossible[Offset, Envelope]( + sourceProvider: SourceProvider[Offset, Envelope], + persistenceId: String, + fromSeqNr: Long, + triggeredBySeqNr: Long): Boolean = { + sourceProvider match { + case provider: CanTriggerReplay => + provider.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr) + true + case _ => + false // no replay support for other source providers + } + } + + private def replayIfPossible[Offset, Envelope]( + offsetStore: R2dbcOffsetStore, + sourceProvider: SourceProvider[Offset, Envelope], + originalEnvelope: Envelope)(accepted: EventEnvelope[Any] => Future[Done])( + implicit ec: ExecutionContext, + system: ActorSystem[_]): Future[Boolean] = { + val logPrefix = offsetStore.logPrefix + originalEnvelope match { + case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => + val underlyingProvider = sourceProvider match { + case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate + case provider => provider + } + underlyingProvider match { + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] + if offsetStore.settings.replayOnRejectedSequenceNumbers => + val persistenceId = originalEventEnvelope.persistenceId + offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + val toSeqNr = originalEventEnvelope.sequenceNr + logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr) + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { + case Some(querySource) => + querySource + .mapAsync(1) { envelope => + import R2dbcOffsetStore.Validation._ + offsetStore + .validate(envelope) + .flatMap { + case Accepted => + accepted(envelope) + case Duplicate => + FutureDone + case RejectedSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + case RejectedBacktrackingSeqNr => + // this shouldn't happen + throw new RejectedEnvelope( + s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") + } + } + .runFold(0) { case (acc, _) => acc + 1 } + .map { count => + val expected = toSeqNr - fromSeqNr + 1 + if (count == expected) { + true + } else { + // it's expected to find all events, otherwise fail the replay attempt + logReplayInvalidCount(logPrefix, originalEventEnvelope, fromSeqNr, count, expected) + throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) + } + } + .recoverWith { exc => + logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc) + Future.failed(exc) + } + case None => + Future.successful( + triggerReplayIfPossible(sourceProvider, persistenceId, fromSeqNr, originalEventEnvelope.sequenceNr)) + } + } + + case _ => + triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope) + } + case _ => + FutureFalse // no replay support for non typed envelopes + } + } + private def throwRejectedEnvelope[Offset, Envelope]( sourceProvider: SourceProvider[Offset, Envelope], envelope: Envelope): Nothing = { @@ -485,6 +862,32 @@ private[projection] object R2dbcProjectionImpl { } } + private def throwRejectedEnvelopeAfterFailedReplay[Offset, Envelope]( + sourceProvider: SourceProvider[Offset, Envelope], + envelope: Envelope): Nothing = { + val source = envelopeSourceName(envelope) + extractOffsetPidSeqNr(sourceProvider, envelope) match { + case OffsetPidSeqNr(_, Some((pid, seqNr))) => + throw new RejectedEnvelope( + s"Replay failed, after rejected envelope from $source, persistenceId [$pid], seqNr [$seqNr], due to unexpected sequence number.") + case OffsetPidSeqNr(_, None) => + throw new RejectedEnvelope( + s"Replay failed, after rejected envelope from $source, due to unexpected sequence number.") + } + } + + private def envelopeSourceName[Envelope](envelope: Envelope): String = { + envelope match { + case env: EventEnvelope[Any @unchecked] => + if (EnvelopeOrigin.fromQuery(env)) "query" + else if (EnvelopeOrigin.fromPubSub(env)) "pubsub" + else if (EnvelopeOrigin.fromBacktracking(env)) "backtracking" + else if (EnvelopeOrigin.fromSnapshot(env)) "snapshot" + else env.source + case _ => "unknown" + } + } + @nowarn("msg=never used") abstract class AdaptedR2dbcHandler[E](val delegate: R2dbcHandler[E])( implicit