Skip to content

Commit

Permalink
fix: R2DBC replay rejected events (#1289)
Browse files Browse the repository at this point in the history
* triggerReplayIfPossible when CurrentEventsByPersistenceIdTypedQuery
  is not supported by underlying query
* env source in replay log messages
* remove misleading replay count in flow
* pubsub logging always at debug
  • Loading branch information
patriknw authored Jan 9, 2025
1 parent 3d8f3ec commit 445c4ea
Show file tree
Hide file tree
Showing 7 changed files with 1,365 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import akka.stream.scaladsl.FlowWithContext
import akka.stream.scaladsl.Source
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.event.Level
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient

/**
Expand Down Expand Up @@ -508,7 +509,7 @@ private[projection] object DynamoDBProjectionImpl {
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr)
logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr)
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
Expand Down Expand Up @@ -544,30 +545,19 @@ private[projection] object DynamoDBProjectionImpl {
env -> ProjectionContextImpl(sourceProvider.extractOffset(env), env, null)
}
.via(handler.asFlow)
.runFold(0) { case (acc, _) => acc + 1 }
.map { count =>
val expected = toSeqNr - fromSeqNr + 1
if (count == expected) {
true
} else {
// FIXME: filtered envelopes are not passed through, so we can't expect all to be replayed here
// and handler could also filter out envelopes
log.debug(
"{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
logPrefix,
count,
expected,
persistenceId,
fromSeqNr,
toSeqNr)
true
}
}
.run()
.map(_ => true)(ExecutionContext.parasitic)
.recoverWith { exc =>
logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc)
logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc)
Future.failed(exc)
}
case None => FutureFalse
case None =>
Future.successful(
triggerReplayIfPossible(
sourceProvider,
persistenceId,
fromSeqNr,
originalEventEnvelope.sequenceNr))
}
}

Expand Down Expand Up @@ -610,45 +600,61 @@ private[projection] object DynamoDBProjectionImpl {
.via(handler)
}

private def logReplayRejected(logPrefix: String, persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Unit = {
val msg =
"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]."
val c = replayRejectedCounter.incrementAndGet()
if (c == 1 || c % 1000 == 0)
log.warn(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c)
else
log.debug(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c)
private def logReplayRejected(logPrefix: String, originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long): Unit = {
if (EnvelopeOrigin.fromPubSub(originalEventEnvelope)) {
log.debug(
"{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}].",
logPrefix,
envelopeSourceName(originalEventEnvelope),
originalEventEnvelope.persistenceId,
fromSeqNr,
originalEventEnvelope.sequenceNr)
} else {
val c = replayRejectedCounter.incrementAndGet()
val logLevel =
if (c == 1 || c % 1000 == 0) Level.WARN else Level.DEBUG
log
.atLevel(logLevel)
.log(
"{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}].",
logPrefix,
envelopeSourceName(originalEventEnvelope),
originalEventEnvelope.persistenceId,
fromSeqNr,
originalEventEnvelope.sequenceNr,
c)
}
}

private def logReplayInvalidCount(
logPrefix: String,
persistenceId: String,
originalEventEnvelope: EventEnvelope[Any],
fromSeqNr: Long,
toSeqNr: Long,
count: Int,
expected: Long): Unit = {
log.warn(
"{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
"{} Replay due to rejected envelope from {}, found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
logPrefix,
envelopeSourceName(originalEventEnvelope),
count,
expected,
persistenceId,
originalEventEnvelope.persistenceId,
fromSeqNr,
toSeqNr)
originalEventEnvelope.sequenceNr)
}

private def logReplayException(
logPrefix: String,
persistenceId: String,
originalEventEnvelope: EventEnvelope[Any],
fromSeqNr: Long,
toSeqNr: Long,
exc: Throwable): Unit = {
log.warn(
"{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].",
"{} Replay due to rejected envelope from {} failed. PersistenceId [{}] from seqNr [{}] to [{}].",
logPrefix,
persistenceId,
envelopeSourceName(originalEventEnvelope),
originalEventEnvelope.persistenceId,
fromSeqNr,
toSeqNr,
originalEventEnvelope.sequenceNr,
exc)
}

Expand All @@ -662,11 +668,10 @@ private[projection] object DynamoDBProjectionImpl {
envelope match {
case env: EventEnvelope[Any @unchecked] if env.sequenceNr > 1 =>
sourceProvider match {
case provider: CanTriggerReplay =>
case _: CanTriggerReplay =>
offsetStore.storedSeqNr(env.persistenceId).map { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
provider.triggerReplay(env.persistenceId, fromSeqNr, env.sequenceNr)
true
triggerReplayIfPossible(sourceProvider, env.persistenceId, fromSeqNr, env.sequenceNr)
}
case _ =>
FutureFalse // no replay support for other source providers
Expand All @@ -676,6 +681,23 @@ private[projection] object DynamoDBProjectionImpl {
}
}

/**
* This replay mechanism is used by GrpcReadJournal
*/
private def triggerReplayIfPossible[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
persistenceId: String,
fromSeqNr: Long,
triggeredBySeqNr: Long): Boolean = {
sourceProvider match {
case provider: CanTriggerReplay =>
provider.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr)
true
case _ =>
false // no replay support for other source providers
}
}

private def replayIfPossible[Offset, Envelope](
offsetStore: DynamoDBOffsetStore,
sourceProvider: SourceProvider[Offset, Envelope],
Expand All @@ -696,7 +718,7 @@ private[projection] object DynamoDBProjectionImpl {
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr)
logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr)
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
Expand Down Expand Up @@ -726,15 +748,17 @@ private[projection] object DynamoDBProjectionImpl {
true
} else {
// it's expected to find all events, otherwise fail the replay attempt
logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected)
logReplayInvalidCount(logPrefix, originalEventEnvelope, fromSeqNr, count, expected)
throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope)
}
}
.recoverWith { exc =>
logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc)
logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc)
Future.failed(exc)
}
case None => FutureFalse
case None =>
Future.successful(
triggerReplayIfPossible(sourceProvider, persistenceId, fromSeqNr, originalEventEnvelope.sequenceNr))
}
}

Expand Down
Loading

0 comments on commit 445c4ea

Please sign in to comment.