From a882e015fdd43434c784b4ee3404ba9d23b3ced3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Nov 2024 18:11:29 +0100 Subject: [PATCH] verify that it finds all events --- .../internal/DynamoDBProjectionImpl.scala | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 1581ef316..a7cc52f36 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -435,7 +435,8 @@ private[projection] object DynamoDBProjectionImpl { val persistenceId = originalEventEnvelope.persistenceId offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 - provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, originalEventEnvelope.sequenceNr) match { + val toSeqNr = originalEventEnvelope.sequenceNr + provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource .mapAsync(1) { envelope => @@ -467,8 +468,23 @@ private[projection] object DynamoDBProjectionImpl { s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].") } } - .run() - .map(_ => true) + .runFold(0) { case (acc, _) => acc + 1 } + .map { count => + val expected = toSeqNr - fromSeqNr + if (count == expected) { + true + } else { + // it's expected to find all events, otherwise fail the replay attempt + log.warn( + "Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + count, + expected, + persistenceId, + fromSeqNr, + toSeqNr) + false + } + } .recoverWith { exc => log.warn( "Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].",