Skip to content

Commit

Permalink
env source in replay log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Jan 9, 2025
1 parent 38dcb29 commit d6ab414
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import akka.stream.scaladsl.FlowWithContext
import akka.stream.scaladsl.Source
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.event.Level
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient

/**
Expand Down Expand Up @@ -508,7 +509,7 @@ private[projection] object DynamoDBProjectionImpl {
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr)
logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr)
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
Expand Down Expand Up @@ -564,7 +565,7 @@ private[projection] object DynamoDBProjectionImpl {
}
}
.recoverWith { exc =>
logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc)
logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc)
Future.failed(exc)
}
case None =>
Expand Down Expand Up @@ -616,45 +617,58 @@ private[projection] object DynamoDBProjectionImpl {
.via(handler)
}

private def logReplayRejected(logPrefix: String, persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Unit = {
private def logReplayRejected(logPrefix: String, originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long): Unit = {
val msg =
"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]."
val c = replayRejectedCounter.incrementAndGet()
if (c == 1 || c % 1000 == 0)
log.warn(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c)
else
log.debug(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c)
"{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]."
val c =
if (EnvelopeOrigin.fromPubSub(originalEventEnvelope))
replayRejectedCounter.get // don't increment counter for pubsub
else
replayRejectedCounter.incrementAndGet()

val logLevel =
if (c == 1 || c % 1000 == 0) Level.WARN else Level.DEBUG
log
.atLevel(logLevel)
.log(
msg,
logPrefix,
envelopeSourceName(originalEventEnvelope),
originalEventEnvelope.persistenceId,
fromSeqNr,
originalEventEnvelope.sequenceNr,
c)
}

private def logReplayInvalidCount(
logPrefix: String,
persistenceId: String,
originalEventEnvelope: EventEnvelope[Any],
fromSeqNr: Long,
toSeqNr: Long,
count: Int,
expected: Long): Unit = {
log.warn(
"{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
"{} Replay due to rejected envelope from {}, found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
logPrefix,
envelopeSourceName(originalEventEnvelope),
count,
expected,
persistenceId,
originalEventEnvelope.persistenceId,
fromSeqNr,
toSeqNr)
originalEventEnvelope.sequenceNr)
}

private def logReplayException(
logPrefix: String,
persistenceId: String,
originalEventEnvelope: EventEnvelope[Any],
fromSeqNr: Long,
toSeqNr: Long,
exc: Throwable): Unit = {
log.warn(
"{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].",
"{} Replay due to rejected envelope from {} failed. PersistenceId [{}] from seqNr [{}] to [{}].",
logPrefix,
persistenceId,
envelopeSourceName(originalEventEnvelope),
originalEventEnvelope.persistenceId,
fromSeqNr,
toSeqNr,
originalEventEnvelope.sequenceNr,
exc)
}

Expand Down Expand Up @@ -718,7 +732,7 @@ private[projection] object DynamoDBProjectionImpl {
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr)
logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr)
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
Expand Down Expand Up @@ -748,12 +762,12 @@ private[projection] object DynamoDBProjectionImpl {
true
} else {
// it's expected to find all events, otherwise fail the replay attempt
logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected)
logReplayInvalidCount(logPrefix, originalEventEnvelope, fromSeqNr, count, expected)
throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope)
}
}
.recoverWith { exc =>
logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc)
logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc)
Future.failed(exc)
}
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import akka.stream.scaladsl.FlowWithContext
import akka.stream.scaladsl.Source
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.event.Level

/**
* INTERNAL API
Expand Down Expand Up @@ -588,7 +589,7 @@ private[projection] object R2dbcProjectionImpl {
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr)
logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr)
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
Expand Down Expand Up @@ -644,7 +645,7 @@ private[projection] object R2dbcProjectionImpl {
}
}
.recoverWith { exc =>
logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc)
logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc)
Future.failed(exc)
}
case None =>
Expand Down Expand Up @@ -696,45 +697,58 @@ private[projection] object R2dbcProjectionImpl {
.via(handler)
}

private def logReplayRejected(logPrefix: String, persistenceId: String, fromSeqNr: Long, toSeqNr: Long): Unit = {
private def logReplayRejected(logPrefix: String, originalEventEnvelope: EventEnvelope[Any], fromSeqNr: Long): Unit = {
val msg =
"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]."
val c = replayRejectedCounter.incrementAndGet()
if (c == 1 || c % 1000 == 0)
log.warn(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c)
else
log.debug(msg, logPrefix, persistenceId, fromSeqNr, toSeqNr, c)
"{} Replaying events after rejected sequence number from {}. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]."
val c =
if (EnvelopeOrigin.fromPubSub(originalEventEnvelope))
replayRejectedCounter.get // don't increment counter for pubsub
else
replayRejectedCounter.incrementAndGet()

val logLevel =
if (c == 1 || c % 1000 == 0) Level.WARN else Level.DEBUG
log
.atLevel(logLevel)
.log(
msg,
logPrefix,
envelopeSourceName(originalEventEnvelope),
originalEventEnvelope.persistenceId,
fromSeqNr,
originalEventEnvelope.sequenceNr,
c)
}

private def logReplayInvalidCount(
logPrefix: String,
persistenceId: String,
originalEventEnvelope: EventEnvelope[Any],
fromSeqNr: Long,
toSeqNr: Long,
count: Int,
expected: Long): Unit = {
log.warn(
"{} Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
"{} Replay due to rejected envelope from {}, found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
logPrefix,
envelopeSourceName(originalEventEnvelope),
count,
expected,
persistenceId,
originalEventEnvelope.persistenceId,
fromSeqNr,
toSeqNr)
originalEventEnvelope.sequenceNr)
}

private def logReplayException(
logPrefix: String,
persistenceId: String,
originalEventEnvelope: EventEnvelope[Any],
fromSeqNr: Long,
toSeqNr: Long,
exc: Throwable): Unit = {
log.warn(
"{} Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].",
"{} Replay due to rejected envelope from {} failed. PersistenceId [{}] from seqNr [{}] to [{}].",
logPrefix,
persistenceId,
envelopeSourceName(originalEventEnvelope),
originalEventEnvelope.persistenceId,
fromSeqNr,
toSeqNr,
originalEventEnvelope.sequenceNr,
exc)
}

Expand Down Expand Up @@ -798,7 +812,7 @@ private[projection] object R2dbcProjectionImpl {
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
logReplayRejected(logPrefix, persistenceId, fromSeqNr, toSeqNr)
logReplayRejected(logPrefix, originalEventEnvelope, fromSeqNr)
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
Expand Down Expand Up @@ -828,12 +842,12 @@ private[projection] object R2dbcProjectionImpl {
true
} else {
// it's expected to find all events, otherwise fail the replay attempt
logReplayInvalidCount(logPrefix, persistenceId, fromSeqNr, toSeqNr, count, expected)
logReplayInvalidCount(logPrefix, originalEventEnvelope, fromSeqNr, count, expected)
throwRejectedEnvelopeAfterFailedReplay(sourceProvider, originalEnvelope)
}
}
.recoverWith { exc =>
logReplayException(logPrefix, persistenceId, fromSeqNr, toSeqNr, exc)
logReplayException(logPrefix, originalEventEnvelope, fromSeqNr, exc)
Future.failed(exc)
}
case None =>
Expand Down

0 comments on commit d6ab414

Please sign in to comment.