Skip to content

Commit

Permalink
log warning every 1000
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 29, 2024
1 parent 673e606 commit 2f237b9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,6 @@ private[projection] class DynamoDBOffsetStore(
// always accept starting from snapshots when there was no previous event seen
FutureAccepted
} else {
println(s"# validateEventTimestamp $recordWithOffset") // FIXME
validateEventTimestamp(currentState, recordWithOffset)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private[projection] object DynamoDBProjectionImpl {
}

private val loadEnvelopeCounter = new AtomicLong
private val replayRejectedCounter = new AtomicLong

def loadEnvelope[Envelope](env: Envelope, sourceProvider: SourceProvider[_, Envelope])(
implicit
Expand Down Expand Up @@ -223,12 +224,7 @@ private[projection] object DynamoDBProjectionImpl {
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
log.debug(
s"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}].",
offsetStore.logPrefix,
persistenceId,
fromSeqNr,
toSeqNr)
logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr)
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
Expand Down Expand Up @@ -485,12 +481,7 @@ private[projection] object DynamoDBProjectionImpl {
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
log.debug(
s"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}].",
offsetStore.logPrefix,
persistenceId,
fromSeqNr,
toSeqNr)
logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr)
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
Expand Down Expand Up @@ -585,12 +576,7 @@ private[projection] object DynamoDBProjectionImpl {
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
log.debug(
s"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}].",
offsetStore.logPrefix,
persistenceId,
fromSeqNr,
toSeqNr)
logReplayRejected(offsetStore, persistenceId, fromSeqNr, toSeqNr)
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
Expand Down Expand Up @@ -699,6 +685,20 @@ private[projection] object DynamoDBProjectionImpl {
.via(handler)
}

private def logReplayRejected(
offsetStore: DynamoDBOffsetStore,
persistenceId: String,
fromSeqNr: Long,
toSeqNr: Long): Unit = {
val msg =
"{} Replaying events after rejected sequence number. PersistenceId [{}], replaying from seqNr [{}] to [{}]. Replay count [{}]."
val c = replayRejectedCounter.incrementAndGet()
if (c == 1 || c % 1000 == 0)
log.warn(msg, offsetStore.logPrefix, persistenceId, fromSeqNr, toSeqNr, c)
else
log.debug(msg, offsetStore.logPrefix, persistenceId, fromSeqNr, toSeqNr, c)
}

private def triggerReplayIfPossible[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
offsetStore: DynamoDBOffsetStore,
Expand Down

0 comments on commit 2f237b9

Please sign in to comment.