Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dynamodb): avoid unnecessary rejection given expired offsets #1295

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import java.util.UUID
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.DurationConverters._

import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
Expand Down Expand Up @@ -191,8 +192,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

if (usingOffsetTTL) {
val expected = System.currentTimeMillis / 1000 + 1.hour.toSeconds
val timestampOffsetExpiry = timestampOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong
timestampOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds
timestampOffsetItem.get(OffsetStoreAttributes.Expiry) shouldBe None // no expiry set on latest-by-slice
val seqNrOffsetExpiry = seqNrOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong
seqNrOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds
} else {
Expand Down Expand Up @@ -352,8 +352,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

if (usingOffsetTTL) {
val expected = System.currentTimeMillis / 1000 + 1.hour.toSeconds
val timestampOffsetExpiry = timestampOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong
timestampOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds
timestampOffsetItem.get(OffsetStoreAttributes.Expiry) shouldBe None // no expiry set on latest-by-slice
val seqNrOffsetExpiry = seqNrOffsetItem.get(OffsetStoreAttributes.Expiry).value.n.toLong
seqNrOffsetExpiry should (be <= expected and be > expected - 10) // within 10 seconds
} else {
Expand Down Expand Up @@ -558,15 +557,14 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

"accept known sequence numbers and reject unknown" in {
val projectionId = genRandomProjectionId()
val offsetStoreClock = TestClock.nowMicros()
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)
val offsetStore = createOffsetStore(
projectionId,
offsetStoreClock = offsetStoreClock,
eventTimestampQueryClock = eventTimestampQueryClock)

// some validation require the startTimestamp, which is set from readOffset
offsetStore.getState().startTimestampBySlice.size shouldBe 0
offsetStore.readOffset().futureValue
offsetStore.getState().startTimestampBySlice.values.toSet shouldBe Set(clock.instant())

val startTime = TestClock.nowMicros().instant()
val startTime = offsetStoreClock.instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p2", 1L)).futureValue
Expand Down Expand Up @@ -632,16 +630,20 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7")
offsetStore.validate(env7).futureValue shouldBe RejectedSeqNr
offsetStore.validate(backtrackingEnvelope(env7)).futureValue shouldBe RejectedBacktrackingSeqNr
// but ok when previous is old
eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600))
val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
offsetStore.validate(env8).futureValue shouldBe Accepted
eventTimestampQueryClock.setInstant(startTime)
offsetStore.addInflight(env8)
// and subsequent seqNr is accepted
val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8")
offsetStore.validate(env9).futureValue shouldBe Accepted
offsetStore.addInflight(env9)
if (usingOffsetTTL) {
// but ok when previous is older than expiry window
val now = offsetStoreClock.tick(JDuration.ofSeconds(10))
val offsetExpiry = settings.timeToLiveSettings.projections.get(projectionId.name).offsetTimeToLive.value
eventTimestampQueryClock.withInstant(now.minusSeconds(offsetExpiry.toSeconds + 1)) {
val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
offsetStore.validate(env8).futureValue shouldBe Accepted
offsetStore.addInflight(env8)
}
// and subsequent seqNr is accepted
val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8")
offsetStore.validate(env9).futureValue shouldBe Accepted
offsetStore.addInflight(env9)
}

// reject unknown filtered
val env10 = filteredEnvelope(createEnvelope("p6", 7L, startTime.plusMillis(10), "e6-7"))
Expand All @@ -654,14 +656,20 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
.futureValue shouldBe RejectedBacktrackingSeqNr

// it's keeping the inflight that are not in the "stored" state
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L, "p5" -> 8L)
if (usingOffsetTTL) {
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L, "p5" -> 8L)
} else {
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L, "p4" -> 2L)
}
// and they are removed from inflight once they have been stored
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(2), Map("p4" -> 2L)), "p4", 2L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map("p5" -> 8L)), "p5", 8L))
.futureValue
if (usingOffsetTTL) {
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(9), Map("p5" -> 8L)), "p5", 8L))
.futureValue
}
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L)
}

Expand Down Expand Up @@ -1045,14 +1053,17 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

val projectionName = UUID.randomUUID().toString

val offsetStoreClock = TestClock.nowMicros()
val eventTimestampQueryClock = TestClock.nowMicros()

def offsetStore(minSlice: Int, maxSlice: Int) =
new DynamoDBOffsetStore(
ProjectionId(projectionName, s"$minSlice-$maxSlice"),
Some(new TestTimestampSourceProvider(minSlice, maxSlice, clock)),
Some(new TestTimestampSourceProvider(minSlice, maxSlice, eventTimestampQueryClock)),
system,
settings,
client,
clock)
offsetStoreClock)

// one projection at lower scale
val offsetStore1 = offsetStore(512, 1023)
Expand All @@ -1064,34 +1075,59 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
val p2 = "p-6009" // slice 640
val p3 = "p-3039" // slice 832

val t0 = clock.instant().minusSeconds(100)
val t0 = offsetStoreClock.instant().minusSeconds(100)
def time(step: Int) = t0.plusSeconds(step)

// starting with 2 projections, testing 512-1023
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 1L)), p3, 1L)).futureValue

// scaled up to 4 projections, testing 512-767
offsetStore2.readOffset().futureValue
offsetStore2.getState().startTimestampBySlice(576) shouldBe time(2)
val slice640StartTimestamp = offsetStore2.getState().startTimestampBySlice(640)
slice640StartTimestamp shouldBe clock.instant()
val latestTime = time(10)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime

// clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr.
// rejected if timestamp of previous seqNr is after start timestamp minus backtracking window
clock.setInstant(slice640StartTimestamp.minus(settings.backtrackingWindow.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
// accepted if timestamp of previous seqNr is before start timestamp minus backtracking window
clock.setInstant(slice640StartTimestamp.minus(settings.timeWindow.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted
// note: eventTimestampQueryClock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr

if (usingOffsetTTL) {
// if offset TTL is configured, use expiry window (from now) to validate old timestamps

val now = offsetStoreClock.tick(JDuration.ofSeconds(10))
val offsetExpiry = settings.timeToLiveSettings.projections.get(projectionName).offsetTimeToLive.get.toJava

// rejected if timestamp of previous seqNr is after expiry timestamp for this slice
eventTimestampQueryClock.withInstant(now.minus(offsetExpiry.minusSeconds(1))) {
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
}

// still rejected if timestamp of previous seqNr is before expiry timestamp for latest (slice not tracked)
eventTimestampQueryClock.withInstant(now.minus(offsetExpiry.plusSeconds(1))) {
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted
}
} else {
// when no offset TTL expiry is configured, then always reject

val now = offsetStoreClock.tick(JDuration.ofSeconds(10))
val testOffsetExpiry = JDuration.ofHours(1)

// rejected if timestamp of previous seqNr is after possible expiry timestamp (but no TTL configured)
eventTimestampQueryClock.withInstant(now.minus(testOffsetExpiry.minusSeconds(1))) {
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
}

// still rejected if timestamp of previous seqNr is before possible expiry timestamp (but no TTL configured)
eventTimestampQueryClock.withInstant(now.minus(testOffsetExpiry.plusSeconds(1))) {
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ object TestClock {
def setInstant(newInstant: Instant): Unit =
_instant = newInstant.truncatedTo(resolution)

def withInstant[T](instant: Instant)(block: => T): T = {
val restore = _instant
try {
setInstant(instant)
block
} finally setInstant(restore)
}

/**
* Increase the clock with this duration (truncated to the resolution)
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internal
ProblemFilters.exclude[Problem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore#State*")
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import scala.annotation.tailrec
import scala.collection.immutable.TreeSet
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.DurationConverters._

import akka.Done
import akka.actor.typed.ActorSystem
Expand Down Expand Up @@ -70,21 +71,17 @@ private[projection] object DynamoDBOffsetStore {
fromSnapshot: Boolean)

object State {
val empty: State = State(Map.empty, Map.empty, Map.empty, Map.empty)

def apply(offsetBySlice: Map[Int, TimestampOffset], startTimestampBySlice: Map[Int, Instant]): State =
if (offsetBySlice.isEmpty && startTimestampBySlice.isEmpty)
empty
else
new State(Map.empty, Map.empty, offsetBySlice, startTimestampBySlice)
val empty: State = State(Map.empty, Map.empty, Map.empty)

def apply(offsetBySlice: Map[Int, TimestampOffset]): State =
if (offsetBySlice.isEmpty) empty
else new State(Map.empty, Map.empty, offsetBySlice)
}

final case class State(
byPid: Map[Pid, Record],
bySliceSorted: Map[Int, TreeSet[Record]],
offsetBySlice: Map[Int, TimestampOffset],
startTimestampBySlice: Map[Int, Instant]) {
offsetBySlice: Map[Int, TimestampOffset]) {

def size: Int = byPid.size

Expand Down Expand Up @@ -221,6 +218,9 @@ private[projection] class DynamoDBOffsetStore(

private val dao = new OffsetStoreDao(system, settings, projectionId, client)

private val offsetExpiry =
settings.timeToLiveSettings.projections.get(projectionId.name).offsetTimeToLive.map(_.toJava)

private[projection] implicit val executionContext: ExecutionContext = system.executionContext

// The OffsetStore instance is used by a single projectionId and there shouldn't be many concurrent
Expand Down Expand Up @@ -296,13 +296,7 @@ private[projection] class DynamoDBOffsetStore(
})

offsetBySliceFut.map { offsetBySlice =>
val now = clock.instant()
val startTimestampBySlice =
(minSlice to maxSlice).map { slice =>
slice -> offsetBySlice.get(slice).map(_.timestamp).getOrElse(now)
}.toMap

val newState = State(offsetBySlice, startTimestampBySlice)
val newState = State(offsetBySlice)

if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
Expand Down Expand Up @@ -622,7 +616,7 @@ private[projection] class DynamoDBOffsetStore(
// always accept starting from snapshots when there was no previous event seen
FutureAccepted
} else {
validateEventTimestamp(currentState, recordWithOffset)
validateEventTimestamp(recordWithOffset)
}
} else {
// strictSeqNr == false is for durable state where each revision might not be visible
Expand All @@ -644,29 +638,28 @@ private[projection] class DynamoDBOffsetStore(
}
}

private def validateEventTimestamp(currentState: State, recordWithOffset: RecordWithOffset) = {
private def validateEventTimestamp(recordWithOffset: RecordWithOffset) = {
import Validation._
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr
val slice = recordWithOffset.record.slice

// Haven't seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the startTimestamp minus backtracking window
// Haven't seen this pid in the time window (or lazy loaded from the database).
// Only accept if the event with previous seqNr is outside the TTL expiry window (if configured).
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val acceptBefore =
currentState.startTimestampBySlice(slice).minus(settings.backtrackingWindow)
val acceptBefore = offsetExpiry.map { expiry =>
val now = clock.instant() // expiry is from when an offset was written, consider the window from now
now.minus(expiry)
}

if (previousTimestamp.isBefore(acceptBefore)) {
if (acceptBefore.exists(timestamp => previousTimestamp.isBefore(timestamp))) {
logger.debug(
"Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before start timestamp [{}] minus backtracking window [{}].",
"is before TTL expiry window timestamp [{}].",
pid,
seqNr,
previousTimestamp,
currentState.startTimestampBySlice(slice),
settings.backtrackingWindow)
acceptBefore.fold("none")(_.toString))
Accepted
} else if (recordWithOffset.fromPubSub) {
logger.debug(
Expand All @@ -679,13 +672,12 @@ private[projection] class DynamoDBOffsetStore(
// This will result in projection restart (with normal configuration)
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " +
"is after start timestamp [{}] minus backtracking window [{}].",
"is after TTL expiry window timestamp [{}].",
seqNr,
pid,
recordWithOffset.offset,
previousTimestamp,
currentState.startTimestampBySlice(slice),
settings.backtrackingWindow)
acceptBefore.fold("none")(_.toString))
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
// and SourceProvider supports it.
RejectedBacktrackingSeqNr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = {
import OffsetStoreDao.OffsetStoreAttributes._

val expiry = timeToLiveSettings.offsetTimeToLive.map { timeToLive =>
Instant.now().plusSeconds(timeToLive.toSeconds)
}

def writeBatch(offsetsBatch: IndexedSeq[(Int, TimestampOffset)]): Future[Done] = {
val writeItems =
offsetsBatch.map {
Expand All @@ -155,10 +151,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}
attributes.put(Seen, AttributeValue.fromM(seen))

expiry.foreach { timestamp =>
attributes.put(Expiry, AttributeValue.fromN(timestamp.getEpochSecond.toString))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When looking at this, noticed the expiry set on the separate latest-by-slice offsets. To keep things safer, have removed this expiry, so we always keep this for restarting a projection, while the per-pid offsets will be expired.

Makes sense. It's the per-pid that will grow.


WriteRequest.builder
.putRequest(
PutRequest
Expand Down