From d6ab41486fe95069adf203bee834d544b998d036 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 9 Jan 2025 09:25:48 +0100 Subject: [PATCH] env source in replay log messages --- .../internal/DynamoDBProjectionImpl.scala | 58 ++++++++++++------- .../r2dbc/internal/R2dbcProjectionImpl.scala | 58 ++++++++++++------- 2 files changed, 72 insertions(+), 44 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index ef79129b1..d43625d21 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -64,6 +64,7 @@ import akka.stream.scaladsl.FlowWithContext import akka.stream.scaladsl.Source import org.slf4j.Logger import org.slf4j.LoggerFactory +import org.slf4j.event.Level import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient /** @@ -508,7 +509,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -564,7 +565,7 @@ private[projection] object DynamoDBProjectionImpl { } } .recoverWith { exc => - logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) + logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc) Future.failed(exc) } case None => @@ -616,45 +617,58 @@ private[projection] object DynamoDBProjectionImpl { .via(handler) } - private def logReplayRejected(logPrefix: String, persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Unit = { + private def logReplayRejected(logPrefix: String, originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long): Unit = { val msg = - "{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]." - val c = replayRejectedCounter.incrementAndGet() - if (c == 1 || c % 1000 == 0) - log.warn(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c) - else - log.debug(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c) + "{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]." + val c = + if (EnvelopeOrigin.fromPubSub(originalEventEnvelope)) + replayRejectedCounter.get // don't increment counter for pubsub + else + replayRejectedCounter.incrementAndGet() + + val logLevel = + if (c == 1 || c % 1000 == 0) Level.WARN else Level.DEBUG + log + .atLevel(logLevel) + .log( + msg, + logPrefix, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + c) } private def logReplayInvalidCount( logPrefix: String, - persistenceId: String, + originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long, - toSeqNr: Long, count: Int, expected: Long): Unit = { log.warn( - "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope from {}, found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", logPrefix, + envelopeSourceName(originalEventEnvelope), count, expected, - persistenceId, + originalEventEnvelope.persistenceId, fromSeqNr, - toSeqNr) + originalEventEnvelope.sequenceNr) } private def logReplayException( logPrefix: String, - persistenceId: String, + originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long, - toSeqNr: Long, exc: Throwable): Unit = { log.warn( - "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope from {} failed. PersistenceId [{}] from seqNr [{}] to [{}].", logPrefix, - persistenceId, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, fromSeqNr, - toSeqNr, + originalEventEnvelope.sequenceNr, exc) } @@ -718,7 +732,7 @@ private[projection] object DynamoDBProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -748,12 +762,12 @@ private[projection] object DynamoDBProjectionImpl { true } else { // it's expected to find all events, otherwise fail the replay attempt - logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected) + logReplayInvalidCount(logPrefix, originalEventEnvelope, fromSeqNr, count, expected) throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) } } .recoverWith { exc => - logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) + logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc) Future.failed(exc) } case None => diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala index c249b9233..4caaf99da 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -68,6 +68,7 @@ import akka.stream.scaladsl.FlowWithContext import akka.stream.scaladsl.Source import org.slf4j.Logger import org.slf4j.LoggerFactory +import org.slf4j.event.Level /** * INTERNAL API @@ -588,7 +589,7 @@ private[projection] object R2dbcProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -644,7 +645,7 @@ private[projection] object R2dbcProjectionImpl { } } .recoverWith { exc => - logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) + logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc) Future.failed(exc) } case None => @@ -696,45 +697,58 @@ private[projection] object R2dbcProjectionImpl { .via(handler) } - private def logReplayRejected(logPrefix: String, persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Unit = { + private def logReplayRejected(logPrefix: String, originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long): Unit = { val msg = - "{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]." - val c = replayRejectedCounter.incrementAndGet() - if (c == 1 || c % 1000 == 0) - log.warn(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c) - else - log.debug(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c) + "{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]." + val c = + if (EnvelopeOrigin.fromPubSub(originalEventEnvelope)) + replayRejectedCounter.get // don't increment counter for pubsub + else + replayRejectedCounter.incrementAndGet() + + val logLevel = + if (c == 1 || c % 1000 == 0) Level.WARN else Level.DEBUG + log + .atLevel(logLevel) + .log( + msg, + logPrefix, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, + fromSeqNr, + originalEventEnvelope.sequenceNr, + c) } private def logReplayInvalidCount( logPrefix: String, - persistenceId: String, + originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long, - toSeqNr: Long, count: Int, expected: Long): Unit = { log.warn( - "{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope from {}, found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].", logPrefix, + envelopeSourceName(originalEventEnvelope), count, expected, - persistenceId, + originalEventEnvelope.persistenceId, fromSeqNr, - toSeqNr) + originalEventEnvelope.sequenceNr) } private def logReplayException( logPrefix: String, - persistenceId: String, + originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long, - toSeqNr: Long, exc: Throwable): Unit = { log.warn( - "{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + "{} Replay due to rejected envelope from {} failed. PersistenceId [{}] from seqNr [{}] to [{}].", logPrefix, - persistenceId, + envelopeSourceName(originalEventEnvelope), + originalEventEnvelope.persistenceId, fromSeqNr, - toSeqNr, + originalEventEnvelope.sequenceNr, exc) } @@ -798,7 +812,7 @@ private[projection] object R2dbcProjectionImpl { offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr => val fromSeqNr = storedSeqNr + 1 val toSeqNr = originalEventEnvelope.sequenceNr - logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr) + logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr) provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match { case Some(querySource) => querySource @@ -828,12 +842,12 @@ private[projection] object R2dbcProjectionImpl { true } else { // it's expected to find all events, otherwise fail the replay attempt - logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected) + logReplayInvalidCount(logPrefix, originalEventEnvelope, fromSeqNr, count, expected) throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope) } } .recoverWith { exc => - logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc) + logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc) Future.failed(exc) } case None =>