Skip to content

Commit

Permalink
fix: R2dbcOffsetStore evict and delete per slice
Browse files Browse the repository at this point in the history
* evict time window for each slice
* remove keep-number-of-entries and evict-interval
* delete per slice, so that we always keep offsets within time window
  for each slice, also after projection scaling
  • Loading branch information
patriknw committed Nov 19, 2024
1 parent bb9d82b commit 7212225
Show file tree
Hide file tree
Showing 11 changed files with 424 additions and 396 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
createRecord("p1", 2, t0.plusMillis(1)),
createRecord("p1", 3, t0.plusMillis(2))))
state1.byPid("p1").seqNr shouldBe 3L
state1.bySliceSorted.size shouldBe 1
state1.bySliceSorted(slice("p1")).size shouldBe 3
state1.bySliceSorted(slice("p1")).head.seqNr shouldBe 1
state1.bySliceSorted(slice("p1")).last.seqNr shouldBe 3
state1.offsetBySlice(slice("p1")) shouldBe TimestampOffset(t0.plusMillis(2), Map("p1" -> 3L))
state1.latestTimestamp shouldBe t0.plusMillis(2)

Expand All @@ -51,6 +55,8 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
state3.byPid("p3").seqNr shouldBe 10L
slice("p3") should not be slice("p1")
slice("p3") should not be slice("p2")
state3.bySliceSorted(slice("p3")).last.pid shouldBe "p3"
state3.bySliceSorted(slice("p3")).last.seqNr shouldBe 10
state3.offsetBySlice(slice("p3")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L))
state3.latestTimestamp shouldBe t0.plusMillis(3)

Expand Down Expand Up @@ -102,6 +108,50 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
p7 -> 7L)
}

"evict old but keep latest for each slice" in {
val t0 = TestClock.nowMillis().instant()
val state1 = State.empty
.add(
Vector(
createRecord("p1", 1, t0),
createRecord("p92", 2, t0.plusMillis(1)),
createRecord("p108", 3, t0.plusMillis(20)),
createRecord("p4", 4, t0.plusMillis(30)),
createRecord("p5", 5, t0.plusMillis(40))))

state1.byPid("p1").slice shouldBe 449
state1.byPid("p92").slice shouldBe 905
state1.byPid("p108").slice shouldBe 905 // same slice as p92
state1.byPid("p4").slice shouldBe 452
state1.byPid("p5").slice shouldBe 453

val slices = state1.bySliceSorted.keySet

state1.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(
"p1" -> 1L,
"p92" -> 2L,
"p108" -> 3L,
"p4" -> 4L,
"p5" -> 5L)

val timeWindow = JDuration.ofMillis(1)

val state2 = slices.foldLeft(state1) {
case (acc, slice) => acc.evict(slice, timeWindow)
}
// note that p92 is evicted because it has same slice as p108
// p1 is kept because keeping one for each slice
state2.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L)

val state3 = slices.foldLeft(state2) {
case (acc, slice) => acc.evict(slice, timeWindow)
}
// still keeping one for each slice
state3.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L)
}

"find duplicate" in {
val t0 = TestClock.nowMillis().instant()
val state =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
def slice(pid: String): Int =
persistenceExt.sliceForPersistenceId(pid)

s"The DynamoDBOffsetStore for TimestampOffset" must {
"The DynamoDBOffsetStore for TimestampOffset" must {

"save TimestampOffset with one entry" in {
val projectionId = genRandomProjectionId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,12 @@ private[projection] object DynamoDBOffsetStore {

val newBySliceSorted =
acc.bySliceSorted.updated(r.slice, acc.bySliceSorted.get(r.slice) match {
case Some(existing) => existing + r
case None => TreeSet.empty[Record] + r
case Some(existing) =>
// we don't remove existing older records for same pid, but they will
// be removed by eviction
existing + r
case None =>
TreeSet.empty[Record] + r
})

val newOffsetBySlice =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.projection.r2dbc

import java.time.{ Duration => JDuration }
import java.time.Instant

import akka.projection.r2dbc.internal.R2dbcOffsetStore.Pid
Expand All @@ -16,9 +17,10 @@ import org.scalatest.wordspec.AnyWordSpec

class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers {

def slice(pid: Pid): Int = math.abs(pid.hashCode % 1024)

def createRecord(pid: Pid, seqNr: SeqNr, timestamp: Instant): Record = {
val slice = math.abs(pid.hashCode % 1024)
Record(slice, pid, seqNr, timestamp)
Record(slice(pid), pid, seqNr, timestamp)
}

"R2dbcOffsetStore.State" should {
Expand All @@ -31,6 +33,10 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers
createRecord("p1", 2, t0.plusMillis(1)),
createRecord("p1", 3, t0.plusMillis(2))))
state1.byPid("p1").seqNr shouldBe 3L
state1.bySliceSorted.size shouldBe 1
state1.bySliceSorted(slice("p1")).size shouldBe 3
state1.bySliceSorted(slice("p1")).head.seqNr shouldBe 1
state1.bySliceSorted(slice("p1")).last.seqNr shouldBe 3
state1.latestTimestamp shouldBe t0.plusMillis(2)
state1.latestOffset.get.seen shouldBe Map("p1" -> 3L)
state1.oldestTimestamp shouldBe t0
Expand All @@ -47,6 +53,8 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers
state3.byPid("p1").seqNr shouldBe 3L
state3.byPid("p2").seqNr shouldBe 2L
state3.byPid("p3").seqNr shouldBe 10L
state3.bySliceSorted(slice("p3")).last.pid shouldBe "p3"
state3.bySliceSorted(slice("p3")).last.seqNr shouldBe 10
state3.latestTimestamp shouldBe t0.plusMillis(3)
state3.latestOffset.get.seen shouldBe Map("p3" -> 10L)
state3.oldestTimestamp shouldBe t0
Expand All @@ -69,40 +77,45 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers
}

"evict old" in {
// these pids have the same slice 645, otherwise it will keep one for each slice
val p1 = "p500"
val p2 = "p621"
val p3 = "p742"
val p4 = "p863"
val p5 = "p984"
val p1 = "p500" // slice 645
val p2 = "p621" // slice 645
val p3 = "p742" // slice 645
val p4 = "p863" // slice 645
val p5 = "p984" // slice 645
val p6 = "p92" // slice 905
val p7 = "p108" // slice 905

val t0 = TestClock.nowMillis().instant()
val state1 = State.empty
.add(
Vector(
createRecord(p1, 1, t0),
createRecord(p2, 2, t0.plusMillis(1)),
createRecord(p3, 3, t0.plusMillis(2)),
createRecord(p4, 4, t0.plusMillis(3)),
createRecord(p5, 5, t0.plusMillis(4))))
state1.latestOffset.get.seen shouldBe Map(p5 -> 5L)
state1.oldestTimestamp shouldBe t0
createRecord(p1, 1, t0.plusMillis(1)),
createRecord(p2, 2, t0.plusMillis(2)),
createRecord(p3, 3, t0.plusMillis(3)),
createRecord(p4, 4, t0.plusMillis(4)),
createRecord(p6, 6, t0.plusMillis(6))))
state1.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L)

val state2 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 1)
state2.latestOffset.get.seen shouldBe Map(p5 -> 5L)
state2.oldestTimestamp shouldBe t0.plusMillis(2)
state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p3 -> 3L, p4 -> 4L, p5 -> 5L)
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

// keep all
state1.evict(t0.plusMillis(2), keepNumberOfEntries = 100) shouldBe state1

// keep 4
val state3 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 4)
state3.latestOffset.get.seen shouldBe Map(p5 -> 5L)
state3.oldestTimestamp shouldBe t0.plusMillis(1)
state3.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L)
state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000)) shouldBe state1

// evict older than time window
val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10))))
val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L)

val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2))
state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(
p1 -> 1L,
p2 -> 2L,
p3 -> 3L,
p4 -> 4L,
p5 -> 5L,
p7 -> 7L)
}

"evict old but keep latest for each slice" in {
Expand All @@ -112,16 +125,18 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers
Vector(
createRecord("p1", 1, t0),
createRecord("p92", 2, t0.plusMillis(1)),
createRecord("p108", 3, t0.plusMillis(2)),
createRecord("p4", 4, t0.plusMillis(3)),
createRecord("p5", 5, t0.plusMillis(4))))
createRecord("p108", 3, t0.plusMillis(20)),
createRecord("p4", 4, t0.plusMillis(30)),
createRecord("p5", 5, t0.plusMillis(40))))

state1.byPid("p1").slice shouldBe 449
state1.byPid("p92").slice shouldBe 905
state1.byPid("p108").slice shouldBe 905 // same slice as p92
state1.byPid("p4").slice shouldBe 452
state1.byPid("p5").slice shouldBe 453

val slices = state1.bySliceSorted.keySet

state1.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(
"p1" -> 1L,
"p92" -> 2L,
Expand All @@ -131,15 +146,21 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers
state1.latestOffset.get.seen shouldBe Map("p5" -> 5L)
state1.oldestTimestamp shouldBe t0

val state2 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 1)
val timeWindow = JDuration.ofMillis(1)

val state2 = slices.foldLeft(state1) {
case (acc, slice) => acc.evict(slice, timeWindow)
}
// note that p92 is evicted because it has same slice as p108
// p1 is kept because keeping one for each slice
state2.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L)
state2.latestOffset.get.seen shouldBe Map("p5" -> 5L)
state2.oldestTimestamp shouldBe t0

val state3 = state1.evict(t0.plusMillis(10), keepNumberOfEntries = 1)
val state3 = slices.foldLeft(state2) {
case (acc, slice) => acc.evict(slice, timeWindow)
}
// still keeping one for each slice
state3.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L)
Expand Down
Loading

0 comments on commit 7212225

Please sign in to comment.