From b366cc6a9aff6c7042b6030ba2ec1b4b16f65a0d Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Tue, 14 Jan 2025 11:42:40 +1300 Subject: [PATCH] fix(r2dbc): avoid unnecessary rejection given deleted offsets (#1293) * use deletion window for timestamp validation * remove startTimestamp from offset store state --- .../internal/DynamoDBOffsetStore.scala | 2 +- .../r2dbc/EventSourcedEndToEndSpec.scala | 12 +-- .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 102 +++++++++++------- .../r2dbc/internal/R2dbcOffsetStore.scala | 72 ++++++------- 4 files changed, 104 insertions(+), 84 deletions(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index b09aa3dca..a44a3ac9e 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -650,7 +650,7 @@ private[projection] class DynamoDBOffsetStore( val seqNr = recordWithOffset.record.seqNr val slice = recordWithOffset.record.slice - // Haven't see seen this pid within the time window. Since events can be missed + // Haven't seen this pid within the time window. Since events can be missed // when read at the tail we will only accept it if the event with previous seqNr has timestamp // before the startTimestamp minus backtracking window timestampOf(pid, seqNr - 1).map { diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala index be0545893..41d6930a9 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/EventSourcedEndToEndSpec.scala @@ -288,13 +288,13 @@ class EventSourcedEndToEndSpec } "accept unknown sequence number if previous is old" in { - val entityType = nextEntityType() - val pid1 = nextPid(entityType) - val pid2 = nextPid(entityType) - val pid3 = nextPid(entityType) + val entityType = "test-002" + val pid1 = s"$entityType|p-08071" // slice 992 + val pid2 = s"$entityType|p-08192" // slice 992 + val pid3 = s"$entityType|p-09160" // slice 992 val startTime = TestClock.nowMicros().instant() - val oldTime = startTime.minus(projectionSettings.timeWindow).minusSeconds(60) + val oldTime = startTime.minus(projectionSettings.deleteAfter).minusSeconds(60) writeEvent(pid1, 1L, startTime, "e1-1") val projectionName = UUID.randomUUID().toString @@ -306,7 +306,7 @@ class EventSourcedEndToEndSpec // old event for pid2, seqN3. will not be picked up by backtracking because outside time window writeEvent(pid2, 3L, oldTime, "e2-3") // pid2, seqNr 3 is unknown when receiving 4 so will lookup timestamp of 3 - // and accept 4 because 3 was older than time window + // and accept 4 because 3 was older than the deletion window (for tracked slice) writeEvent(pid2, 4L, startTime.plusMillis(1), "e2-4") processedProbe.receiveMessage().envelope.event shouldBe "e2-4" diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index dd1b6927e..64acdcc40 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -453,33 +453,35 @@ class R2dbcTimestampOffsetStoreSpec val eventTimestampQueryClock = TestClock.nowMicros() val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock) - // some validation require the startTimestamp, which is set from readOffset - offsetStore.getState().startTimestamp shouldBe Instant.EPOCH - offsetStore.readOffset().futureValue - offsetStore.getState().startTimestamp shouldBe clock.instant() + val p1 = "p-08071" // slice 101 + val p2 = "p-08072" // slice 102 + val p3 = "p-08073" // slice 103 + val p4 = "p-08074" // slice 104 + val p5 = "p-08192" // slice 101 (same as p1) + val p6 = "p-08076" // slice 106 val startTime = TestClock.nowMicros().instant() - val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L)) - offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue - offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue - offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p3", 5L)).futureValue + val offset1 = TimestampOffset(startTime, Map(p1 -> 3L, p2 -> 1L, p3 -> 5L)) + offsetStore.saveOffset(OffsetPidSeqNr(offset1, p1, 3L)).futureValue + offsetStore.saveOffset(OffsetPidSeqNr(offset1, p2, 1L)).futureValue + offsetStore.saveOffset(OffsetPidSeqNr(offset1, p3, 5L)).futureValue // seqNr 1 is always accepted - val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1") + val env1 = createEnvelope(p4, 1L, startTime.plusMillis(1), "e4-1") offsetStore.validate(env1).futureValue shouldBe Accepted offsetStore.validate(backtrackingEnvelope(env1)).futureValue shouldBe Accepted // but not if already inflight, seqNr 1 was accepted offsetStore.addInflight(env1) - val env1Later = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1") + val env1Later = createEnvelope(p4, 1L, startTime.plusMillis(1), "e4-1") offsetStore.validate(env1Later).futureValue shouldBe Duplicate offsetStore.validate(backtrackingEnvelope(env1Later)).futureValue shouldBe Duplicate // subsequent seqNr is accepted - val env2 = createEnvelope("p4", 2L, startTime.plusMillis(2), "e4-2") + val env2 = createEnvelope(p4, 2L, startTime.plusMillis(2), "e4-2") offsetStore.validate(env2).futureValue shouldBe Accepted offsetStore.validate(backtrackingEnvelope(env2)).futureValue shouldBe Accepted offsetStore.addInflight(env2) // but not when gap - val envP4SeqNr4 = createEnvelope("p4", 4L, startTime.plusMillis(3), "e4-4") + val envP4SeqNr4 = createEnvelope(p4, 4L, startTime.plusMillis(3), "e4-4") offsetStore.validate(envP4SeqNr4).futureValue shouldBe RejectedSeqNr // hard reject when gap from backtracking offsetStore.validate(backtrackingEnvelope(envP4SeqNr4)).futureValue shouldBe RejectedBacktrackingSeqNr @@ -490,17 +492,17 @@ class R2dbcTimestampOffsetStoreSpec .validate(backtrackingEnvelope(filteredEnvelope(envP4SeqNr4))) .futureValue shouldBe RejectedBacktrackingSeqNr // and not if later already inflight, seqNr 2 was accepted - offsetStore.validate(createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe Duplicate + offsetStore.validate(createEnvelope(p4, 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe Duplicate // +1 to known is accepted - val env3 = createEnvelope("p1", 4L, startTime.plusMillis(4), "e1-4") + val env3 = createEnvelope(p1, 4L, startTime.plusMillis(4), "e1-4") offsetStore.validate(env3).futureValue shouldBe Accepted // but not same - offsetStore.validate(createEnvelope("p3", 5L, startTime, "e3-5")).futureValue shouldBe Duplicate + offsetStore.validate(createEnvelope(p3, 5L, startTime, "e3-5")).futureValue shouldBe Duplicate // but not same, even if it's 1 - offsetStore.validate(createEnvelope("p2", 1L, startTime, "e2-1")).futureValue shouldBe Duplicate + offsetStore.validate(createEnvelope(p2, 1L, startTime, "e2-1")).futureValue shouldBe Duplicate // and not less - offsetStore.validate(createEnvelope("p3", 4L, startTime, "e3-4")).futureValue shouldBe Duplicate + offsetStore.validate(createEnvelope(p3, 4L, startTime, "e3-4")).futureValue shouldBe Duplicate offsetStore.addInflight(env3) // and then it's not accepted again offsetStore.validate(env3).futureValue shouldBe Duplicate @@ -510,33 +512,33 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.validate(backtrackingEnvelope(env2)).futureValue shouldBe Duplicate // +1 to known, and then also subsequent are accepted (needed for grouped) - val env4 = createEnvelope("p3", 6L, startTime.plusMillis(5), "e3-6") + val env4 = createEnvelope(p3, 6L, startTime.plusMillis(5), "e3-6") offsetStore.validate(env4).futureValue shouldBe Accepted offsetStore.addInflight(env4) - val env5 = createEnvelope("p3", 7L, startTime.plusMillis(6), "e3-7") + val env5 = createEnvelope(p3, 7L, startTime.plusMillis(6), "e3-7") offsetStore.validate(env5).futureValue shouldBe Accepted offsetStore.addInflight(env5) - val env6 = createEnvelope("p3", 8L, startTime.plusMillis(7), "e3-8") + val env6 = createEnvelope(p3, 8L, startTime.plusMillis(7), "e3-8") offsetStore.validate(env6).futureValue shouldBe Accepted offsetStore.addInflight(env6) // reject unknown - val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7") + val env7 = createEnvelope(p5, 7L, startTime.plusMillis(8), "e5-7") offsetStore.validate(env7).futureValue shouldBe RejectedSeqNr offsetStore.validate(backtrackingEnvelope(env7)).futureValue shouldBe RejectedBacktrackingSeqNr - // but ok when previous is old - eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600)) - val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7") + // but ok when previous is old (offset has been deleted but slice is known) + eventTimestampQueryClock.setInstant(startTime.minus(settings.deleteAfter.plusSeconds(1))) + val env8 = createEnvelope(p5, 7L, startTime.plusMillis(5), "e5-7") offsetStore.validate(env8).futureValue shouldBe Accepted eventTimestampQueryClock.setInstant(startTime) offsetStore.addInflight(env8) // and subsequent seqNr is accepted - val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8") + val env9 = createEnvelope(p5, 8L, startTime.plusMillis(9), "e5-8") offsetStore.validate(env9).futureValue shouldBe Accepted offsetStore.addInflight(env9) // reject unknown filtered - val env10 = filteredEnvelope(createEnvelope("p6", 7L, startTime.plusMillis(10), "e6-7")) + val env10 = filteredEnvelope(createEnvelope(p6, 7L, startTime.plusMillis(10), "e6-7")) offsetStore.validate(env10).futureValue shouldBe RejectedSeqNr // hard reject when unknown from backtracking offsetStore.validate(backtrackingEnvelope(env10)).futureValue shouldBe RejectedBacktrackingSeqNr @@ -546,15 +548,15 @@ class R2dbcTimestampOffsetStoreSpec .futureValue shouldBe RejectedBacktrackingSeqNr // it's keeping the inflight that are not in the "stored" state - offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L, "p5" -> 8L) + offsetStore.getInflight() shouldBe Map(p1 -> 4L, p3 -> 8L, p4 -> 2L, p5 -> 8L) // and they are removed from inflight once they have been stored offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), Map("p4" -> 2L)), "p4", 2L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), Map(p4 -> 2L)), p4, 2L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map("p5" -> 8L)), "p5", 8L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map(p5 -> 8L)), p5, 8L)) .futureValue - offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L) + offsetStore.getInflight() shouldBe Map(p1 -> 4L, p3 -> 8L) } "accept via loading of previous seqNr" in { @@ -1569,35 +1571,57 @@ class R2dbcTimestampOffsetStoreSpec val p1 = "p-0960" // slice 576 val p2 = "p-6009" // slice 640 - val p3 = "p-3039" // slice 832 + val p3 = "p-7219" // slice 640 + val p4 = "p-3039" // slice 832 val t0 = clock.instant().minusSeconds(100) def time(step: Int) = t0.plusSeconds(step) // starting with 2 projections, testing 512-1023 offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p1 -> 1L)), p1, 1L)).futureValue - offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 1L)), p3, 1L)).futureValue + offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p4 -> 1L)), p4, 1L)).futureValue // scaled up to 4 projections, testing 512-767 val startOffset2 = TimestampOffset.toTimestampOffset(offsetStore2.readOffset().futureValue.get) startOffset2.timestamp shouldBe time(2) - offsetStore2.getState().startTimestamp shouldBe time(2) val latestTime = time(10) offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue offsetStore2.getState().latestTimestamp shouldBe latestTime - // clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr. - // rejected if timestamp of previous seqNr is after start timestamp minus backtracking window - clock.setInstant(startOffset2.timestamp.minus(settings.backtrackingWindow.minusSeconds(1))) + // note: clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr + + // when slice is not tracked: then always reject + + // rejected if timestamp of previous seqNr is after delete-until timestamp for latest + clock.setInstant(latestTime.minus(settings.deleteAfter.minusSeconds(1))) offsetStore2 .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) .futureValue shouldBe RejectedBacktrackingSeqNr - // accepted if timestamp of previous seqNr is before start timestamp minus backtracking window - clock.setInstant(startOffset2.timestamp.minus(settings.deleteAfter.plusSeconds(1))) + + // still rejected if timestamp of previous seqNr is before delete-until timestamp for latest (different slice) + clock.setInstant(latestTime.minus(settings.deleteAfter.plusSeconds(1))) offsetStore2 .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) - .futureValue shouldBe Accepted + .futureValue shouldBe RejectedBacktrackingSeqNr + + // add an offset for the slice under test + val latestTime2 = time(20) + offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime2, Map(p3 -> 1L)), p3, 1L)).futureValue + offsetStore2.getState().latestTimestamp shouldBe latestTime2 + // when slice is tracked: use deletion window for this slice + + // rejected if timestamp of previous seqNr is after delete-until timestamp for this slice + clock.setInstant(latestTime2.minus(settings.deleteAfter.minusSeconds(1))) + offsetStore2 + .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) + .futureValue shouldBe RejectedBacktrackingSeqNr + + // accepted if timestamp of previous seqNr is before delete-until timestamp for this slice + clock.setInstant(latestTime2.minus(settings.deleteAfter.plusSeconds(1))) + offsetStore2 + .validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4"))) + .futureValue shouldBe Accepted } } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index fb568af06..294cb66be 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -4,7 +4,23 @@ package akka.projection.r2dbc.internal +import java.lang.Boolean.FALSE +import java.lang.Boolean.TRUE +import java.time.Clock +import java.time.Instant +import java.time.{ Duration => JDuration } +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.immutable +import scala.collection.immutable.TreeSet +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + import akka.Done +import akka.actor.Cancellable import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.Persistence @@ -24,26 +40,10 @@ import akka.projection.internal.ManagementState import akka.projection.internal.OffsetSerialization import akka.projection.internal.OffsetSerialization.MultipleOffsets import akka.projection.r2dbc.R2dbcProjectionSettings -import io.r2dbc.spi.Connection -import org.slf4j.LoggerFactory -import java.time.Clock -import java.time.Instant -import java.time.{ Duration => JDuration } -import java.util.UUID -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicReference -import java.lang.Boolean.FALSE -import java.lang.Boolean.TRUE - -import scala.annotation.tailrec -import scala.collection.immutable -import scala.collection.immutable.TreeSet -import scala.concurrent.ExecutionContext -import scala.concurrent.Future - -import akka.actor.Cancellable import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source +import io.r2dbc.spi.Connection +import org.slf4j.LoggerFactory /** * INTERNAL API @@ -79,7 +79,7 @@ private[projection] object R2dbcOffsetStore { final case class RecordWithProjectionKey(record: Record, projectionKey: String) object State { - val empty: State = State(Map.empty, Map.empty, Instant.EPOCH) + val empty: State = State(Map.empty, Map.empty) def apply(records: immutable.IndexedSeq[Record]): State = { if (records.isEmpty) empty @@ -87,7 +87,7 @@ private[projection] object R2dbcOffsetStore { } } - final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]], startTimestamp: Instant) { + final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]]) { def size: Int = byPid.size @@ -440,13 +440,7 @@ private[projection] class R2dbcOffsetStore( newState.latestTimestamp, startOffset) - val startTimestamp = startOffset match { - case None => clock.instant() - case Some(offset) => offset.timestamp - } - val newStateWithStartOffset = newState.copy(startTimestamp = startTimestamp) - - if (!state.compareAndSet(oldState, newStateWithStartOffset)) + if (!state.compareAndSet(oldState, newState)) throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.") startOffset @@ -848,24 +842,26 @@ private[projection] class R2dbcOffsetStore( import Validation._ val pid = recordWithOffset.record.pid val seqNr = recordWithOffset.record.seqNr + val slice = recordWithOffset.record.slice - // Haven't see seen this pid within the time window. Since events can be missed - // when read at the tail we will only accept it if the event with previous seqNr has timestamp - // before the startTimestamp minus backtracking window + // Haven't seen this pid in the time window (or lazy loaded from the database). + // Only accept it if the event with previous seqNr is outside the deletion window (for tracked slices). timestampOf(pid, seqNr - 1).map { case Some(previousTimestamp) => - val acceptBefore = currentState.startTimestamp.minus(settings.backtrackingWindow) + val acceptBefore = currentState.bySliceSorted.get(slice).map { bySliceSorted => + bySliceSorted.last.timestamp.minus(settings.deleteAfter) + } - if (previousTimestamp.isBefore(acceptBefore)) { + if (acceptBefore.exists(timestamp => previousTimestamp.isBefore(timestamp))) { logger.debug( "{} Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " + - "is before start timestamp [{}] minus backtracking window [{}].", + "is before deletion window timestamp [{}] for slice [{}].", logPrefix, pid, seqNr, previousTimestamp, - currentState.startTimestamp, - settings.backtrackingWindow) + acceptBefore.fold("none")(_.toString), + slice) Accepted } else if (recordWithOffset.fromPubSub) { // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled @@ -882,14 +878,14 @@ private[projection] class R2dbcOffsetStore( // and SourceProvider supports it. logger.warn( "{} Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " + - "is after start timestamp [{}] minus backtracking window [{}].", + "is after deletion window timestamp [{}] for slice [{}].", logPrefix, seqNr, pid, recordWithOffset.offset, previousTimestamp, - currentState.startTimestamp, - settings.backtrackingWindow) + acceptBefore.fold("none")(_.toString), + slice) RejectedBacktrackingSeqNr } else { // This may happen rather frequently when using `publish-events`, after reconnecting and such.