Skip to content

Commit

Permalink
fix: DynamoDB evict offsets per slice (#1247)
Browse files Browse the repository at this point in the history
* don't use evicted state when saving changed timestamp per slice
* evict time window for each slice
* remove keep-number-of-entries and evict-interval
  • Loading branch information
patriknw authored Nov 18, 2024
1 parent aa00f8c commit 997fb78
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.projection.dynamodb

import java.time.{ Duration => JDuration }
import java.time.Instant

import akka.persistence.query.TimestampOffset
Expand Down Expand Up @@ -35,7 +36,6 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
state1.byPid("p1").seqNr shouldBe 3L
state1.offsetBySlice(slice("p1")) shouldBe TimestampOffset(t0.plusMillis(2), Map("p1" -> 3L))
state1.latestTimestamp shouldBe t0.plusMillis(2)
state1.oldestTimestamp shouldBe t0

val state2 = state1.add(Vector(createRecord("p2", 2, t0.plusMillis(1))))
state2.byPid("p1").seqNr shouldBe 3L
Expand All @@ -44,7 +44,6 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
state2.offsetBySlice(slice("p2")) shouldBe TimestampOffset(t0.plusMillis(1), Map("p2" -> 2L))
// latest not updated because timestamp of p2 was before latest
state2.latestTimestamp shouldBe t0.plusMillis(2)
state2.oldestTimestamp shouldBe t0

val state3 = state2.add(Vector(createRecord("p3", 10, t0.plusMillis(3))))
state3.byPid("p1").seqNr shouldBe 3L
Expand All @@ -54,7 +53,6 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
slice("p3") should not be slice("p2")
state3.offsetBySlice(slice("p3")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L))
state3.latestTimestamp shouldBe t0.plusMillis(3)
state3.oldestTimestamp shouldBe t0

// same slice and same timestamp, keep both in seen
slice("p10084") shouldBe slice("p3")
Expand All @@ -63,37 +61,45 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
}

"evict old" in {
// these pids have the same slice 645, otherwise it will keep one for each slice
val p1 = "p500"
val p2 = "p621"
val p3 = "p742"
val p4 = "p863"
val p5 = "p984"
val p1 = "p500" // slice 645
val p2 = "p621" // slice 645
val p3 = "p742" // slice 645
val p4 = "p863" // slice 645
val p5 = "p984" // slice 645
val p6 = "p92" // slice 905
val p7 = "p108" // slice 905

val t0 = TestClock.nowMillis().instant()
val state1 = State.empty
.add(
Vector(
createRecord(p1, 1, t0),
createRecord(p2, 2, t0.plusMillis(1)),
createRecord(p3, 3, t0.plusMillis(2)),
createRecord(p4, 4, t0.plusMillis(3)),
createRecord(p5, 5, t0.plusMillis(4))))
state1.oldestTimestamp shouldBe t0
createRecord(p1, 1, t0.plusMillis(1)),
createRecord(p2, 2, t0.plusMillis(2)),
createRecord(p3, 3, t0.plusMillis(3)),
createRecord(p4, 4, t0.plusMillis(4)),
createRecord(p6, 6, t0.plusMillis(6))))
state1.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L)

val state2 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 1)
state2.oldestTimestamp shouldBe t0.plusMillis(2)
state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p3 -> 3L, p4 -> 4L, p5 -> 5L)
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

// keep all
state1.evict(t0.plusMillis(2), keepNumberOfEntries = 100) shouldBe state1

// keep 4
val state3 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 4)
state3.oldestTimestamp shouldBe t0.plusMillis(1)
state3.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L)
state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000)) shouldBe state1

// evict older than time window
val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10))))
val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L)

val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2))
state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(
p1 -> 1L,
p2 -> 2L,
p3 -> 3L,
p4 -> 4L,
p5 -> 5L,
p7 -> 7L)
}

"find duplicate" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Validation.Duplicat
import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Validation.RejectedBacktrackingSeqNr
import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Validation.RejectedSeqNr
import akka.projection.dynamodb.internal.OffsetPidSeqNr
import akka.projection.dynamodb.internal.OffsetStoreDao
import akka.projection.dynamodb.internal.OffsetStoreDao.OffsetStoreAttributes
import akka.projection.internal.ManagementState
import com.typesafe.config.Config
Expand All @@ -42,13 +43,7 @@ import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

object DynamoDBTimestampOffsetStoreSpec {
val config: Config =
ConfigFactory
.parseString("""
# to be able to test eviction
akka.projection.dynamodb.offset-store.keep-number-of-entries = 0
""")
.withFallback(TestConfig.config)
val config: Config = TestConfig.config

def configWithOffsetTTL: Config =
ConfigFactory
Expand Down Expand Up @@ -808,7 +803,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

"evict old records from same slice" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10))
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100))
import evictSettings._
val offsetStore = createOffsetStore(projectionId, evictSettings)

Expand All @@ -827,34 +822,22 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)),
p4,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)),
p5,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)),
p6,
3L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L))
.futureValue
offsetStore.getState().size shouldBe 6

Expand All @@ -864,81 +847,59 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
offsetStore.getState().size shouldBe 7 // nothing evicted yet

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)),
p8,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L))
.futureValue
offsetStore.getState().size shouldBe 8 // still nothing evicted yet

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)),
p8,
2L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8)

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)),
p8,
3L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p7, p8)
}

"evict old records from different slices" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10))
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100))
import evictSettings._
val offsetStore = createOffsetStore(projectionId, evictSettings)

val startTime = TestClock.nowMicros().instant()
log.debug("Start time [{}]", startTime)

val p1 = "p500" // slice 645
val p2 = "p92" // slice 905
val p3 = "p108" // slice 905
val p4 = "p863" // slice 645
val p5 = "p984" // slice 645
val p6 = "p3080" // slice 645
val p7 = "p4290" // slice 645
val p8 = "p20180" // slice 645
// these pids have the same slice 645
val p1 = "p500"
val p2 = "p621"
val p3 = "p742"
val p4 = "p863"
val p5 = "p984"
val p6 = "p3080"
val p7 = "p4290"
val p8 = "p20180"
val p9 = "p-0960" // slice 576

offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)),
p4,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)),
p5,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)),
p6,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L))
.futureValue
offsetStore.getState().size shouldBe 6

Expand All @@ -948,31 +909,46 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
offsetStore.getState().size shouldBe 7 // nothing evicted yet

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)),
p8,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L))
.futureValue
offsetStore.getState().size shouldBe 8 // still nothing evicted yet

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)),
p8,
2L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8)

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)),
p8,
3L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p7, p8)

// save same slice, but behind
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1001), Map(p2 -> 2L)), p2, 2L))
.futureValue
// it's evicted immediately
offsetStore.getState().byPid.keySet shouldBe Set(p7, p8)
val dao = new OffsetStoreDao(system, settings, projectionId, client)
// but still saved
dao.loadSequenceNumber(slice(p2), p2).futureValue.get.seqNr shouldBe 2
// the timestamp was earlier than previously used for this slice, and therefore stored timestamp not changed
dao.loadTimestampOffset(slice(p2)).futureValue.get.timestamp shouldBe startTime.plus(timeWindow.plusSeconds(20))

// save another slice that hasn't been used before
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1002), Map(p9 -> 1L)), p9, 1L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8)
dao.loadSequenceNumber(slice(p9), p9).futureValue.get.seqNr shouldBe 1
dao.loadTimestampOffset(slice(p9)).futureValue.get.timestamp shouldBe startTime.plusMillis(1002)
// and one more of that same slice
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1003), Map(p9 -> 2L)), p9, 2L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8)
dao.loadSequenceNumber(slice(p9), p9).futureValue.get.seqNr shouldBe 2
dao.loadTimestampOffset(slice(p9)).futureValue.get.timestamp shouldBe startTime.plusMillis(1003)
}

"start from slice offset" in {
Expand Down
8 changes: 0 additions & 8 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ akka.projection.dynamodb {
# within this time window from latest offset.
time-window = 5 minutes

# Keep this number of entries. Don't evict old entries until this threshold
# has been reached.
keep-number-of-entries = 10000

# Remove old entries outside the time-window from the offset store memory
# with this frequency.
evict-interval = 10 seconds

# Trying to batch insert offsets in batches of this size.
offset-batch-size = 20

Expand Down
Loading

0 comments on commit 997fb78

Please sign in to comment.