Skip to content

Commit

Permalink
fix(r2dbc): avoid unnecessary rejection given deleted offsets (#1293)
Browse files Browse the repository at this point in the history
* use deletion window for timestamp validation
* remove startTimestamp from offset store state
  • Loading branch information
pvlugter authored Jan 13, 2025
1 parent d574bd7 commit b366cc6
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ private[projection] class DynamoDBOffsetStore(
val seqNr = recordWithOffset.record.seqNr
val slice = recordWithOffset.record.slice

// Haven't see seen this pid within the time window. Since events can be missed
// Haven't seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the startTimestamp minus backtracking window
timestampOf(pid, seqNr - 1).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,13 @@ class EventSourcedEndToEndSpec
}

"accept unknown sequence number if previous is old" in {
val entityType = nextEntityType()
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val pid3 = nextPid(entityType)
val entityType = "test-002"
val pid1 = s"$entityType|p-08071" // slice 992
val pid2 = s"$entityType|p-08192" // slice 992
val pid3 = s"$entityType|p-09160" // slice 992

val startTime = TestClock.nowMicros().instant()
val oldTime = startTime.minus(projectionSettings.timeWindow).minusSeconds(60)
val oldTime = startTime.minus(projectionSettings.deleteAfter).minusSeconds(60)
writeEvent(pid1, 1L, startTime, "e1-1")

val projectionName = UUID.randomUUID().toString
Expand All @@ -306,7 +306,7 @@ class EventSourcedEndToEndSpec
// old event for pid2, seqN3. will not be picked up by backtracking because outside time window
writeEvent(pid2, 3L, oldTime, "e2-3")
// pid2, seqNr 3 is unknown when receiving 4 so will lookup timestamp of 3
// and accept 4 because 3 was older than time window
// and accept 4 because 3 was older than the deletion window (for tracked slice)
writeEvent(pid2, 4L, startTime.plusMillis(1), "e2-4")
processedProbe.receiveMessage().envelope.event shouldBe "e2-4"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,33 +453,35 @@ class R2dbcTimestampOffsetStoreSpec
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

// some validation require the startTimestamp, which is set from readOffset
offsetStore.getState().startTimestamp shouldBe Instant.EPOCH
offsetStore.readOffset().futureValue
offsetStore.getState().startTimestamp shouldBe clock.instant()
val p1 = "p-08071" // slice 101
val p2 = "p-08072" // slice 102
val p3 = "p-08073" // slice 103
val p4 = "p-08074" // slice 104
val p5 = "p-08192" // slice 101 (same as p1)
val p6 = "p-08076" // slice 106

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue
val offset1 = TimestampOffset(startTime, Map(p1 -> 3L, p2 -> 1L, p3 -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, p1, 3L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, p2, 1L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, p3, 5L)).futureValue

// seqNr 1 is always accepted
val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
val env1 = createEnvelope(p4, 1L, startTime.plusMillis(1), "e4-1")
offsetStore.validate(env1).futureValue shouldBe Accepted
offsetStore.validate(backtrackingEnvelope(env1)).futureValue shouldBe Accepted
// but not if already inflight, seqNr 1 was accepted
offsetStore.addInflight(env1)
val env1Later = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
val env1Later = createEnvelope(p4, 1L, startTime.plusMillis(1), "e4-1")
offsetStore.validate(env1Later).futureValue shouldBe Duplicate
offsetStore.validate(backtrackingEnvelope(env1Later)).futureValue shouldBe Duplicate
// subsequent seqNr is accepted
val env2 = createEnvelope("p4", 2L, startTime.plusMillis(2), "e4-2")
val env2 = createEnvelope(p4, 2L, startTime.plusMillis(2), "e4-2")
offsetStore.validate(env2).futureValue shouldBe Accepted
offsetStore.validate(backtrackingEnvelope(env2)).futureValue shouldBe Accepted
offsetStore.addInflight(env2)
// but not when gap
val envP4SeqNr4 = createEnvelope("p4", 4L, startTime.plusMillis(3), "e4-4")
val envP4SeqNr4 = createEnvelope(p4, 4L, startTime.plusMillis(3), "e4-4")
offsetStore.validate(envP4SeqNr4).futureValue shouldBe RejectedSeqNr
// hard reject when gap from backtracking
offsetStore.validate(backtrackingEnvelope(envP4SeqNr4)).futureValue shouldBe RejectedBacktrackingSeqNr
Expand All @@ -490,17 +492,17 @@ class R2dbcTimestampOffsetStoreSpec
.validate(backtrackingEnvelope(filteredEnvelope(envP4SeqNr4)))
.futureValue shouldBe RejectedBacktrackingSeqNr
// and not if later already inflight, seqNr 2 was accepted
offsetStore.validate(createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe Duplicate
offsetStore.validate(createEnvelope(p4, 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe Duplicate

// +1 to known is accepted
val env3 = createEnvelope("p1", 4L, startTime.plusMillis(4), "e1-4")
val env3 = createEnvelope(p1, 4L, startTime.plusMillis(4), "e1-4")
offsetStore.validate(env3).futureValue shouldBe Accepted
// but not same
offsetStore.validate(createEnvelope("p3", 5L, startTime, "e3-5")).futureValue shouldBe Duplicate
offsetStore.validate(createEnvelope(p3, 5L, startTime, "e3-5")).futureValue shouldBe Duplicate
// but not same, even if it's 1
offsetStore.validate(createEnvelope("p2", 1L, startTime, "e2-1")).futureValue shouldBe Duplicate
offsetStore.validate(createEnvelope(p2, 1L, startTime, "e2-1")).futureValue shouldBe Duplicate
// and not less
offsetStore.validate(createEnvelope("p3", 4L, startTime, "e3-4")).futureValue shouldBe Duplicate
offsetStore.validate(createEnvelope(p3, 4L, startTime, "e3-4")).futureValue shouldBe Duplicate
offsetStore.addInflight(env3)
// and then it's not accepted again
offsetStore.validate(env3).futureValue shouldBe Duplicate
Expand All @@ -510,33 +512,33 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.validate(backtrackingEnvelope(env2)).futureValue shouldBe Duplicate

// +1 to known, and then also subsequent are accepted (needed for grouped)
val env4 = createEnvelope("p3", 6L, startTime.plusMillis(5), "e3-6")
val env4 = createEnvelope(p3, 6L, startTime.plusMillis(5), "e3-6")
offsetStore.validate(env4).futureValue shouldBe Accepted
offsetStore.addInflight(env4)
val env5 = createEnvelope("p3", 7L, startTime.plusMillis(6), "e3-7")
val env5 = createEnvelope(p3, 7L, startTime.plusMillis(6), "e3-7")
offsetStore.validate(env5).futureValue shouldBe Accepted
offsetStore.addInflight(env5)
val env6 = createEnvelope("p3", 8L, startTime.plusMillis(7), "e3-8")
val env6 = createEnvelope(p3, 8L, startTime.plusMillis(7), "e3-8")
offsetStore.validate(env6).futureValue shouldBe Accepted
offsetStore.addInflight(env6)

// reject unknown
val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7")
val env7 = createEnvelope(p5, 7L, startTime.plusMillis(8), "e5-7")
offsetStore.validate(env7).futureValue shouldBe RejectedSeqNr
offsetStore.validate(backtrackingEnvelope(env7)).futureValue shouldBe RejectedBacktrackingSeqNr
// but ok when previous is old
eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600))
val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
// but ok when previous is old (offset has been deleted but slice is known)
eventTimestampQueryClock.setInstant(startTime.minus(settings.deleteAfter.plusSeconds(1)))
val env8 = createEnvelope(p5, 7L, startTime.plusMillis(5), "e5-7")
offsetStore.validate(env8).futureValue shouldBe Accepted
eventTimestampQueryClock.setInstant(startTime)
offsetStore.addInflight(env8)
// and subsequent seqNr is accepted
val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8")
val env9 = createEnvelope(p5, 8L, startTime.plusMillis(9), "e5-8")
offsetStore.validate(env9).futureValue shouldBe Accepted
offsetStore.addInflight(env9)

// reject unknown filtered
val env10 = filteredEnvelope(createEnvelope("p6", 7L, startTime.plusMillis(10), "e6-7"))
val env10 = filteredEnvelope(createEnvelope(p6, 7L, startTime.plusMillis(10), "e6-7"))
offsetStore.validate(env10).futureValue shouldBe RejectedSeqNr
// hard reject when unknown from backtracking
offsetStore.validate(backtrackingEnvelope(env10)).futureValue shouldBe RejectedBacktrackingSeqNr
Expand All @@ -546,15 +548,15 @@ class R2dbcTimestampOffsetStoreSpec
.futureValue shouldBe RejectedBacktrackingSeqNr

// it's keeping the inflight that are not in the "stored" state
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L, "p5" -> 8L)
offsetStore.getInflight() shouldBe Map(p1 -> 4L, p3 -> 8L, p4 -> 2L, p5 -> 8L)
// and they are removed from inflight once they have been stored
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), Map("p4" -> 2L)), "p4", 2L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), Map(p4 -> 2L)), p4, 2L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map("p5" -> 8L)), "p5", 8L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map(p5 -> 8L)), p5, 8L))
.futureValue
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L)
offsetStore.getInflight() shouldBe Map(p1 -> 4L, p3 -> 8L)
}

"accept via loading of previous seqNr" in {
Expand Down Expand Up @@ -1569,35 +1571,57 @@ class R2dbcTimestampOffsetStoreSpec

val p1 = "p-0960" // slice 576
val p2 = "p-6009" // slice 640
val p3 = "p-3039" // slice 832
val p3 = "p-7219" // slice 640
val p4 = "p-3039" // slice 832

val t0 = clock.instant().minusSeconds(100)
def time(step: Int) = t0.plusSeconds(step)

// starting with 2 projections, testing 512-1023
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 1L)), p3, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p4 -> 1L)), p4, 1L)).futureValue

// scaled up to 4 projections, testing 512-767
val startOffset2 = TimestampOffset.toTimestampOffset(offsetStore2.readOffset().futureValue.get)
startOffset2.timestamp shouldBe time(2)
offsetStore2.getState().startTimestamp shouldBe time(2)
val latestTime = time(10)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime

// clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr.
// rejected if timestamp of previous seqNr is after start timestamp minus backtracking window
clock.setInstant(startOffset2.timestamp.minus(settings.backtrackingWindow.minusSeconds(1)))
// note: clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr

// when slice is not tracked: then always reject

// rejected if timestamp of previous seqNr is after delete-until timestamp for latest
clock.setInstant(latestTime.minus(settings.deleteAfter.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
// accepted if timestamp of previous seqNr is before start timestamp minus backtracking window
clock.setInstant(startOffset2.timestamp.minus(settings.deleteAfter.plusSeconds(1)))

// still rejected if timestamp of previous seqNr is before delete-until timestamp for latest (different slice)
clock.setInstant(latestTime.minus(settings.deleteAfter.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted
.futureValue shouldBe RejectedBacktrackingSeqNr

// add an offset for the slice under test
val latestTime2 = time(20)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime2, Map(p3 -> 1L)), p3, 1L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime2

// when slice is tracked: use deletion window for this slice

// rejected if timestamp of previous seqNr is after delete-until timestamp for this slice
clock.setInstant(latestTime2.minus(settings.deleteAfter.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr

// accepted if timestamp of previous seqNr is before delete-until timestamp for this slice
clock.setInstant(latestTime2.minus(settings.deleteAfter.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,23 @@

package akka.projection.r2dbc.internal

import java.lang.Boolean.FALSE
import java.lang.Boolean.TRUE
import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.TreeSet
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.Done
import akka.actor.Cancellable
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.Persistence
Expand All @@ -24,26 +40,10 @@ import akka.projection.internal.ManagementState
import akka.projection.internal.OffsetSerialization
import akka.projection.internal.OffsetSerialization.MultipleOffsets
import akka.projection.r2dbc.R2dbcProjectionSettings
import io.r2dbc.spi.Connection
import org.slf4j.LoggerFactory
import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import java.lang.Boolean.FALSE
import java.lang.Boolean.TRUE

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.TreeSet
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.actor.Cancellable
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import io.r2dbc.spi.Connection
import org.slf4j.LoggerFactory

/**
* INTERNAL API
Expand Down Expand Up @@ -79,15 +79,15 @@ private[projection] object R2dbcOffsetStore {
final case class RecordWithProjectionKey(record: Record, projectionKey: String)

object State {
val empty: State = State(Map.empty, Map.empty, Instant.EPOCH)
val empty: State = State(Map.empty, Map.empty)

def apply(records: immutable.IndexedSeq[Record]): State = {
if (records.isEmpty) empty
else empty.add(records)
}
}

final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]], startTimestamp: Instant) {
final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]]) {

def size: Int = byPid.size

Expand Down Expand Up @@ -440,13 +440,7 @@ private[projection] class R2dbcOffsetStore(
newState.latestTimestamp,
startOffset)

val startTimestamp = startOffset match {
case None => clock.instant()
case Some(offset) => offset.timestamp
}
val newStateWithStartOffset = newState.copy(startTimestamp = startTimestamp)

if (!state.compareAndSet(oldState, newStateWithStartOffset))
if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")

startOffset
Expand Down Expand Up @@ -848,24 +842,26 @@ private[projection] class R2dbcOffsetStore(
import Validation._
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr
val slice = recordWithOffset.record.slice

// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the startTimestamp minus backtracking window
// Haven't seen this pid in the time window (or lazy loaded from the database).
// Only accept it if the event with previous seqNr is outside the deletion window (for tracked slices).
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val acceptBefore = currentState.startTimestamp.minus(settings.backtrackingWindow)
val acceptBefore = currentState.bySliceSorted.get(slice).map { bySliceSorted =>
bySliceSorted.last.timestamp.minus(settings.deleteAfter)
}

if (previousTimestamp.isBefore(acceptBefore)) {
if (acceptBefore.exists(timestamp => previousTimestamp.isBefore(timestamp))) {
logger.debug(
"{} Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before start timestamp [{}] minus backtracking window [{}].",
"is before deletion window timestamp [{}] for slice [{}].",
logPrefix,
pid,
seqNr,
previousTimestamp,
currentState.startTimestamp,
settings.backtrackingWindow)
acceptBefore.fold("none")(_.toString),
slice)
Accepted
} else if (recordWithOffset.fromPubSub) {
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
Expand All @@ -882,14 +878,14 @@ private[projection] class R2dbcOffsetStore(
// and SourceProvider supports it.
logger.warn(
"{} Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " +
"is after start timestamp [{}] minus backtracking window [{}].",
"is after deletion window timestamp [{}] for slice [{}].",
logPrefix,
seqNr,
pid,
recordWithOffset.offset,
previousTimestamp,
currentState.startTimestamp,
settings.backtrackingWindow)
acceptBefore.fold("none")(_.toString),
slice)
RejectedBacktrackingSeqNr
} else {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
Expand Down

0 comments on commit b366cc6

Please sign in to comment.