Skip to content

Commit

Permalink
verify that it finds all events
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 28, 2024
1 parent cb9063c commit a882e01
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ private[projection] object DynamoDBProjectionImpl {
val persistenceId = originalEventEnvelope.persistenceId
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, originalEventEnvelope.sequenceNr) match {
val toSeqNr = originalEventEnvelope.sequenceNr
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
.mapAsync(1) { envelope =>
Expand Down Expand Up @@ -467,8 +468,23 @@ private[projection] object DynamoDBProjectionImpl {
s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].")
}
}
.run()
.map(_ => true)
.runFold(0) { case (acc, _) => acc + 1 }
.map { count =>
val expected = toSeqNr - fromSeqNr
if (count == expected) {
true
} else {
// it's expected to find all events, otherwise fail the replay attempt
log.warn(
"Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
count,
expected,
persistenceId,
fromSeqNr,
toSeqNr)
false
}
}
.recoverWith { exc =>
log.warn(
"Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].",
Expand Down

0 comments on commit a882e01

Please sign in to comment.