From 7212225b1c646ced08181c3fd094c6c267e6d58e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Nov 2024 16:54:06 +0100 Subject: [PATCH] fix: R2dbcOffsetStore evict and delete per slice * evict time window for each slice * remove keep-number-of-entries and evict-interval * delete per slice, so that we always keep offsets within time window for each slice, also after projection scaling --- .../DynamoDBOffsetStoreStateSpec.scala | 50 +++ .../DynamoDBTimestampOffsetStoreSpec.scala | 2 +- .../internal/DynamoDBOffsetStore.scala | 8 +- .../r2dbc/R2dbcOffsetStoreStateSpec.scala | 87 +++-- .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 252 +++++++------- .../src/main/resources/reference.conf | 8 - .../r2dbc/R2dbcProjectionSettings.scala | 21 +- .../r2dbc/internal/OffsetStoreDao.scala | 2 +- .../internal/PostgresOffsetStoreDao.scala | 33 +- .../r2dbc/internal/R2dbcOffsetStore.scala | 307 ++++++++++-------- .../internal/SqlServerOffsetStoreDao.scala | 50 +-- 11 files changed, 424 insertions(+), 396 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala index 334869f19..dd80fb0da 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala @@ -34,6 +34,10 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match createRecord("p1", 2, t0.plusMillis(1)), createRecord("p1", 3, t0.plusMillis(2)))) state1.byPid("p1").seqNr shouldBe 3L + state1.bySliceSorted.size shouldBe 1 + state1.bySliceSorted(slice("p1")).size shouldBe 3 + state1.bySliceSorted(slice("p1")).head.seqNr shouldBe 1 + state1.bySliceSorted(slice("p1")).last.seqNr shouldBe 3 state1.offsetBySlice(slice("p1")) shouldBe TimestampOffset(t0.plusMillis(2), Map("p1" -> 3L)) state1.latestTimestamp shouldBe t0.plusMillis(2) @@ -51,6 +55,8 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match state3.byPid("p3").seqNr shouldBe 10L slice("p3") should not be slice("p1") slice("p3") should not be slice("p2") + state3.bySliceSorted(slice("p3")).last.pid shouldBe "p3" + state3.bySliceSorted(slice("p3")).last.seqNr shouldBe 10 state3.offsetBySlice(slice("p3")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L)) state3.latestTimestamp shouldBe t0.plusMillis(3) @@ -102,6 +108,50 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match p7 -> 7L) } + "evict old but keep latest for each slice" in { + val t0 = TestClock.nowMillis().instant() + val state1 = State.empty + .add( + Vector( + createRecord("p1", 1, t0), + createRecord("p92", 2, t0.plusMillis(1)), + createRecord("p108", 3, t0.plusMillis(20)), + createRecord("p4", 4, t0.plusMillis(30)), + createRecord("p5", 5, t0.plusMillis(40)))) + + state1.byPid("p1").slice shouldBe 449 + state1.byPid("p92").slice shouldBe 905 + state1.byPid("p108").slice shouldBe 905 // same slice as p92 + state1.byPid("p4").slice shouldBe 452 + state1.byPid("p5").slice shouldBe 453 + + val slices = state1.bySliceSorted.keySet + + state1.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map( + "p1" -> 1L, + "p92" -> 2L, + "p108" -> 3L, + "p4" -> 4L, + "p5" -> 5L) + + val timeWindow = JDuration.ofMillis(1) + + val state2 = slices.foldLeft(state1) { + case (acc, slice) => acc.evict(slice, timeWindow) + } + // note that p92 is evicted because it has same slice as p108 + // p1 is kept because keeping one for each slice + state2.byPid + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L) + + val state3 = slices.foldLeft(state2) { + case (acc, slice) => acc.evict(slice, timeWindow) + } + // still keeping one for each slice + state3.byPid + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L) + } + "find duplicate" in { val t0 = TestClock.nowMillis().instant() val state = diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala index b0d705b1d..2e63cc61e 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala @@ -162,7 +162,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) def slice(pid: String): Int = persistenceExt.sliceForPersistenceId(pid) - s"The DynamoDBOffsetStore for TimestampOffset" must { + "The DynamoDBOffsetStore for TimestampOffset" must { "save TimestampOffset with one entry" in { val projectionId = genRandomProjectionId() diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index 9219891ca..e9e8c6af3 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -112,8 +112,12 @@ private[projection] object DynamoDBOffsetStore { val newBySliceSorted = acc.bySliceSorted.updated(r.slice, acc.bySliceSorted.get(r.slice) match { - case Some(existing) => existing + r - case None => TreeSet.empty[Record] + r + case Some(existing) => + // we don't remove existing older records for same pid, but they will + // be removed by eviction + existing + r + case None => + TreeSet.empty[Record] + r }) val newOffsetBySlice = diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala index 843fc9c4a..e8a02c3c9 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcOffsetStoreStateSpec.scala @@ -4,6 +4,7 @@ package akka.projection.r2dbc +import java.time.{ Duration => JDuration } import java.time.Instant import akka.projection.r2dbc.internal.R2dbcOffsetStore.Pid @@ -16,9 +17,10 @@ import org.scalatest.wordspec.AnyWordSpec class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers { + def slice(pid: Pid): Int = math.abs(pid.hashCode % 1024) + def createRecord(pid: Pid, seqNr: SeqNr, timestamp: Instant): Record = { - val slice = math.abs(pid.hashCode % 1024) - Record(slice, pid, seqNr, timestamp) + Record(slice(pid), pid, seqNr, timestamp) } "R2dbcOffsetStore.State" should { @@ -31,6 +33,10 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers createRecord("p1", 2, t0.plusMillis(1)), createRecord("p1", 3, t0.plusMillis(2)))) state1.byPid("p1").seqNr shouldBe 3L + state1.bySliceSorted.size shouldBe 1 + state1.bySliceSorted(slice("p1")).size shouldBe 3 + state1.bySliceSorted(slice("p1")).head.seqNr shouldBe 1 + state1.bySliceSorted(slice("p1")).last.seqNr shouldBe 3 state1.latestTimestamp shouldBe t0.plusMillis(2) state1.latestOffset.get.seen shouldBe Map("p1" -> 3L) state1.oldestTimestamp shouldBe t0 @@ -47,6 +53,8 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers state3.byPid("p1").seqNr shouldBe 3L state3.byPid("p2").seqNr shouldBe 2L state3.byPid("p3").seqNr shouldBe 10L + state3.bySliceSorted(slice("p3")).last.pid shouldBe "p3" + state3.bySliceSorted(slice("p3")).last.seqNr shouldBe 10 state3.latestTimestamp shouldBe t0.plusMillis(3) state3.latestOffset.get.seen shouldBe Map("p3" -> 10L) state3.oldestTimestamp shouldBe t0 @@ -69,40 +77,45 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers } "evict old" in { - // these pids have the same slice 645, otherwise it will keep one for each slice - val p1 = "p500" - val p2 = "p621" - val p3 = "p742" - val p4 = "p863" - val p5 = "p984" + val p1 = "p500" // slice 645 + val p2 = "p621" // slice 645 + val p3 = "p742" // slice 645 + val p4 = "p863" // slice 645 + val p5 = "p984" // slice 645 + val p6 = "p92" // slice 905 + val p7 = "p108" // slice 905 val t0 = TestClock.nowMillis().instant() val state1 = State.empty .add( Vector( - createRecord(p1, 1, t0), - createRecord(p2, 2, t0.plusMillis(1)), - createRecord(p3, 3, t0.plusMillis(2)), - createRecord(p4, 4, t0.plusMillis(3)), - createRecord(p5, 5, t0.plusMillis(4)))) - state1.latestOffset.get.seen shouldBe Map(p5 -> 5L) - state1.oldestTimestamp shouldBe t0 + createRecord(p1, 1, t0.plusMillis(1)), + createRecord(p2, 2, t0.plusMillis(2)), + createRecord(p3, 3, t0.plusMillis(3)), + createRecord(p4, 4, t0.plusMillis(4)), + createRecord(p6, 6, t0.plusMillis(6)))) state1.byPid - .map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L) - - val state2 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 1) - state2.latestOffset.get.seen shouldBe Map(p5 -> 5L) - state2.oldestTimestamp shouldBe t0.plusMillis(2) - state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p3 -> 3L, p4 -> 4L, p5 -> 5L) + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) // keep all - state1.evict(t0.plusMillis(2), keepNumberOfEntries = 100) shouldBe state1 - - // keep 4 - val state3 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 4) - state3.latestOffset.get.seen shouldBe Map(p5 -> 5L) - state3.oldestTimestamp shouldBe t0.plusMillis(1) - state3.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L) + state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000)) shouldBe state1 + + // evict older than time window + val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2)) + state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) + + val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10)))) + val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2)) + state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L) + + val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2)) + state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map( + p1 -> 1L, + p2 -> 2L, + p3 -> 3L, + p4 -> 4L, + p5 -> 5L, + p7 -> 7L) } "evict old but keep latest for each slice" in { @@ -112,9 +125,9 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers Vector( createRecord("p1", 1, t0), createRecord("p92", 2, t0.plusMillis(1)), - createRecord("p108", 3, t0.plusMillis(2)), - createRecord("p4", 4, t0.plusMillis(3)), - createRecord("p5", 5, t0.plusMillis(4)))) + createRecord("p108", 3, t0.plusMillis(20)), + createRecord("p4", 4, t0.plusMillis(30)), + createRecord("p5", 5, t0.plusMillis(40)))) state1.byPid("p1").slice shouldBe 449 state1.byPid("p92").slice shouldBe 905 @@ -122,6 +135,8 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers state1.byPid("p4").slice shouldBe 452 state1.byPid("p5").slice shouldBe 453 + val slices = state1.bySliceSorted.keySet + state1.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map( "p1" -> 1L, "p92" -> 2L, @@ -131,7 +146,11 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers state1.latestOffset.get.seen shouldBe Map("p5" -> 5L) state1.oldestTimestamp shouldBe t0 - val state2 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 1) + val timeWindow = JDuration.ofMillis(1) + + val state2 = slices.foldLeft(state1) { + case (acc, slice) => acc.evict(slice, timeWindow) + } // note that p92 is evicted because it has same slice as p108 // p1 is kept because keeping one for each slice state2.byPid @@ -139,7 +158,9 @@ class R2dbcOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Matchers state2.latestOffset.get.seen shouldBe Map("p5" -> 5L) state2.oldestTimestamp shouldBe t0 - val state3 = state1.evict(t0.plusMillis(10), keepNumberOfEntries = 1) + val state3 = slices.foldLeft(state2) { + case (acc, slice) => acc.evict(slice, timeWindow) + } // still keeping one for each slice state3.byPid .map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L) diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index b22201013..131688ba2 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -29,7 +29,6 @@ import akka.projection.r2dbc.internal.OffsetPidSeqNr import akka.projection.r2dbc.internal.R2dbcOffsetStore import akka.projection.r2dbc.internal.R2dbcOffsetStore.Pid import akka.projection.r2dbc.internal.R2dbcOffsetStore.SeqNr -import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory @@ -48,13 +47,7 @@ object R2dbcTimestampOffsetStoreSpec { } class R2dbcTimestampOffsetStoreSpec - extends ScalaTestWithActorTestKit( - ConfigFactory - .parseString(""" - # to be able to test eviction - akka.projection.r2dbc.offset-store.keep-number-of-entries = 0 - """) - .withFallback(TestConfig.config)) + extends ScalaTestWithActorTestKit(TestConfig.config) with AnyWordSpecLike with TestDbLifecycle with TestData @@ -134,6 +127,9 @@ class R2dbcTimestampOffsetStoreSpec TimestampOffset(timestamp, timestamp.plusMillis(1000), Map(pid -> revision)), timestamp.toEpochMilli) + def slice(pid: String): Int = + persistenceExt.sliceForPersistenceId(pid) + s"The R2dbcOffsetStore for TimestampOffset (dialect ${r2dbcSettings.dialectName})" must { "save TimestampOffset with one entry" in { @@ -738,16 +734,16 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 20L) } - "evict old records" in { + "evict old records from same slice" in { val projectionId = genRandomProjectionId() - val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10)) + val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)) import evictSettings._ val offsetStore = createOffsetStore(projectionId, evictSettings) val startTime = TestClock.nowMicros().instant() log.debug("Start time [{}]", startTime) - // these pids have the same slice 645, otherwise it will keep one for each slice + // these pids have the same slice 645 val p1 = "p500" val p2 = "p621" val p3 = "p742" @@ -759,34 +755,22 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)), - p4, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)), - p5, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)), - p6, - 3L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L)) .futureValue offsetStore.getState().size shouldBe 6 @@ -796,81 +780,59 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getState().size shouldBe 7 // nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)), - p8, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L)) .futureValue offsetStore.getState().size shouldBe 8 // still nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)), - p8, - 2L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L)) .futureValue offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8) offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)), - p8, - 3L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L)) .futureValue offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) } - "evict old records but keep latest for each slice" in { + "evict old records from different slices" in { val projectionId = genRandomProjectionId() - val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10)) + val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)) import evictSettings._ val offsetStore = createOffsetStore(projectionId, evictSettings) val startTime = TestClock.nowMicros().instant() log.debug("Start time [{}]", startTime) - val p1 = "p500" // slice 645 - val p2 = "p92" // slice 905 - val p3 = "p108" // slice 905 - val p4 = "p863" // slice 645 - val p5 = "p984" // slice 645 - val p6 = "p3080" // slice 645 - val p7 = "p4290" // slice 645 - val p8 = "p20180" // slice 645 + // these pids have the same slice 645 + val p1 = "p500" + val p2 = "p621" + val p3 = "p742" + val p4 = "p863" + val p5 = "p984" + val p6 = "p3080" + val p7 = "p4290" + val p8 = "p20180" + val p9 = "p-0960" // slice 576 offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)), - p4, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)), - p5, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)), - p6, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L)) .futureValue offsetStore.getState().size shouldBe 6 @@ -880,32 +842,42 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getState().size shouldBe 7 // nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)), - p8, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L)) .futureValue offsetStore.getState().size shouldBe 8 // still nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)), - p8, - 2L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L)) .futureValue - // also keeping p3 ("p108") for slice 905 - offsetStore.getState().byPid.keySet shouldBe Set(p3, p5, p6, p7, p8) + offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8) offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)), - p8, - 3L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L)) .futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p3, p7, p8) + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) + + // save same slice, but behind + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1001), Map(p2 -> 2L)), p2, 2L)) + .futureValue + // it's evicted immediately + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) + val dao = offsetStore.dao + // but still saved + dao.readTimestampOffset(slice(p2), p2).futureValue.get.seqNr shouldBe 2 + + // save another slice that hasn't been used before + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1002), Map(p9 -> 1L)), p9, 1L)) + .futureValue + offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8) + dao.readTimestampOffset(slice(p9), p9).futureValue.get.seqNr shouldBe 1 + // and one more of that same slice + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1003), Map(p9 -> 2L)), p9, 2L)) + .futureValue + offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8) + dao.readTimestampOffset(slice(p9), p9).futureValue.get.seqNr shouldBe 2 } "delete old records" in { @@ -1012,7 +984,11 @@ class R2dbcTimestampOffsetStoreSpec offsetStore .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(3)), Map(p8 -> 1L)), p8, 1L)) .futureValue - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(3)), Map(p3 -> 2L)), p3, 2L)) + .futureValue + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 + offsetStore.getState().byPid.keySet shouldBe Set(p3, p4, p5, p6, p7, p8) offsetStore.readOffset().futureValue // this will load from database // p3 is kept for slice 905 offsetStore.getState().byPid.keySet shouldBe Set(p3, p4, p5, p6, p7, p8) @@ -1026,7 +1002,6 @@ class R2dbcTimestampOffsetStoreSpec val projectionId = genRandomProjectionId() val deleteSettings = settings .withTimeWindow(JDuration.ofSeconds(windowSeconds)) - .withKeepNumberOfEntries(2000) .withDeleteInterval(JDuration.ofHours(1)) // don't run the scheduled deletes val offsetStore = createOffsetStore(projectionId, deleteSettings) @@ -1092,15 +1067,13 @@ class R2dbcTimestampOffsetStoreSpec } } - "delete old records triggered by time window, while still within entries limit" in { + "delete old records from different slices" in { val projectionId = genRandomProjectionId() val evictSettings = settings - .withKeepNumberOfEntries(10) .withTimeWindow(JDuration.ofSeconds(100)) - .withEvictInterval(JDuration.ofSeconds(10)) val offsetStore = createOffsetStore(projectionId, evictSettings) - import evictSettings.{ evictInterval, timeWindow } + import evictSettings.timeWindow val t0 = TestClock.nowMicros().instant() log.debug("Start time [{}]", t0) @@ -1123,13 +1096,13 @@ class R2dbcTimestampOffsetStoreSpec val t3 = t0.plusSeconds(3) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t3, Map(p3 -> 1L)), p3, 1L)).futureValue - val t4 = t0.plus(evictInterval).plusSeconds(1) + val t4 = t0.plusSeconds(11) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t4, Map(p4 -> 1L)), p4, 1L)).futureValue - val t5 = t0.plus(evictInterval).plusSeconds(2) + val t5 = t0.plusSeconds(12) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t5, Map(p5 -> 1L)), p5, 1L)).futureValue - val t6 = t0.plus(evictInterval).plusSeconds(3) + val t6 = t0.plusSeconds(13) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue offsetStore.getState().size shouldBe 6 @@ -1140,32 +1113,30 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getState().size shouldBe 7 // no eviction offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (within time window) - val t8 = t0.plus(timeWindow.plus(evictInterval).minusSeconds(3)) + val t8 = t0.plus(timeWindow.plusSeconds(7)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue - offsetStore.getState().size shouldBe 8 // no eviction - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p1@t1 and p2@t2, kept p3@t3 (latest) + offsetStore.getState().byPid.keySet shouldBe Set(p2, p3, p4, p5, p6, p7, p8) // eviction slice 645 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 1 // deleted p1@t1 - val t9 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3)) + val t9 = t0.plus(timeWindow.plusSeconds(13)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue - offsetStore.getState().size shouldBe 8 // no eviction (outside eviction window, but within keep-number-of-entries) - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p4@t4 and p5@t5, kept p3@t3 (latest) + offsetStore.getState().byPid.keySet shouldBe Set(p2, p3, p6, p7, p8) // eviction slice 645 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p4@t4 and p5@t5 - offsetStore.getState().byPid.keySet shouldBe Set(p1, p2, p3, p4, p5, p6, p7, p8) + offsetStore.getState().byPid.keySet shouldBe Set(p2, p3, p6, p7, p8) offsetStore.readOffset().futureValue // reload from database - offsetStore.getState().byPid.keySet shouldBe Set(p3, p6, p7, p8) + offsetStore.getState().byPid.keySet shouldBe Set(p2, p3, p6, p7, p8) } - "delete old records triggered after eviction" in { + "delete old records for same slice" in { val projectionId = genRandomProjectionId() val evictSettings = settings - .withKeepNumberOfEntries(5) .withTimeWindow(JDuration.ofSeconds(100)) - .withEvictInterval(JDuration.ofSeconds(10)) val offsetStore = createOffsetStore(projectionId, evictSettings) - import evictSettings.{ evictInterval, timeWindow } + import evictSettings.timeWindow val t0 = TestClock.nowMicros().instant() log.debug("Start time [{}]", t0) @@ -1193,69 +1164,69 @@ class R2dbcTimestampOffsetStoreSpec val t3 = t0.plusSeconds(3) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t3, Map(p3 -> 1L)), p3, 1L)).futureValue - val t4 = t0.plus(evictInterval).plusSeconds(7) + val t4 = t0.plusSeconds(17) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t4, Map(p4 -> 1L)), p4, 1L)).futureValue - val t5 = t0.plus(evictInterval).plusSeconds(8) + val t5 = t0.plusSeconds(18) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t5, Map(p5 -> 1L)), p5, 1L)).futureValue - val t6 = t0.plus(evictInterval).plusSeconds(9) + val t6 = t0.plusSeconds(19) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue offsetStore.getState().size shouldBe 6 // no eviction offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion - val t7 = t0.plus(timeWindow.minus(evictInterval)) + val t7 = t0.plus(timeWindow.minusSeconds(10)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t7, Map(p7 -> 1L)), p7, 1L)).futureValue offsetStore.getState().size shouldBe 7 // no eviction offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (within time window) - val t8 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3)) + val t8 = t0.plus(timeWindow.plusSeconds(13)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8) // evicted p1@t1, p2@t2, and p3@t3 offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deletion triggered by eviction - val t9 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(10)) + val t9 = t0.plus(timeWindow.plusSeconds(30)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue - offsetStore.getState().size shouldBe 5 // no eviction (outside time window, but still within limit) + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) // evicted offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deleted p4@t4, p5@t5, p6@t6 (outside window) - val t10 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(11)) + val t10 = t0.plus(timeWindow.plusSeconds(31)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t10, Map(p9 -> 1L)), p9, 1L)).futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8, p9) // evicted p4@t4 - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9) // nothing evicted + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // but if deletion triggered nothing to delete - val t11 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(12)) + val t11 = t0.plus(timeWindow.plusSeconds(32)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t11, Map(p10 -> 1L)), p10, 1L)).futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p6, p7, p8, p9, p10) // evicted p5@t5 - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10) // nothing evicted + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // but if deletion triggered nothing to delete - val t12 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(13)) + val t12 = t0.plus(timeWindow.plusSeconds(33)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t12, Map(p11 -> 1L)), p11, 1L)).futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11) // evicted p6@t6 - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11) // nothing evicted + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // but if deletion triggered nothing to delete - val t13 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(14)) + val t13 = t0.plus(timeWindow.plusSeconds(34)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t13, Map(p12 -> 1L)), p12, 1L)).futureValue - offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11, p12) // no eviction (within time window) - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11, p12) // nothing evicted + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // but if deletion triggered nothing to delete - val t14 = t0.plus(timeWindow.multipliedBy(2).plus(evictInterval.multipliedBy(3)).plusSeconds(1)) + val t14 = t7.plus(timeWindow.plusSeconds(1)) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t14, Map(p12 -> 2L)), p12, 2L)).futureValue offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) // evicted p7@t7 - offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // triggered by evict, deleted p7@t7, p8@t8, p8@t9 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 1 // triggered by evict, deleted p7@t7 offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) offsetStore.readOffset().futureValue // reload from database - offsetStore.getState().byPid.keySet shouldBe Set(p9, p10, p11, p12) + offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) } "set offset" in { @@ -1429,7 +1400,7 @@ class R2dbcTimestampOffsetStoreSpec val state1 = offsetStore3.getState() state1.size shouldBe 4 - state1.latestBySlice.size shouldBe 4 + state1.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets().size shouldBe 4 // all latest are from other projection keys offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload @@ -1448,7 +1419,7 @@ class R2dbcTimestampOffsetStoreSpec val state2 = offsetStore3.getState() state2.size shouldBe 4 - state2.latestBySlice.size shouldBe 4 + state2.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets().size shouldBe 2 // latest by slice still from other projection keys (768-1023) offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload @@ -1486,7 +1457,7 @@ class R2dbcTimestampOffsetStoreSpec val state3 = offsetStore3.getState() state3.size shouldBe 4 - state3.latestBySlice.size shouldBe 4 + state3.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets().size shouldBe 1 // latest by slice still from 768-1023 offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload @@ -1520,7 +1491,7 @@ class R2dbcTimestampOffsetStoreSpec val state4 = offsetStore3.getState() state4.size shouldBe 4 - state4.latestBySlice.size shouldBe 4 + state4.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets().size shouldBe 1 // latest by slice still from 768-1023 offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload @@ -1556,14 +1527,17 @@ class R2dbcTimestampOffsetStoreSpec val state5 = offsetStore3.getState() state5.size shouldBe 4 - state5.latestBySlice.size shouldBe 4 + state5.bySliceSorted.size shouldBe 4 offsetStore3.getForeignOffsets() shouldBe empty offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // outdated offsets, included those for 768-1023, will eventually be deleted offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p1 -> 4L)), p1, 4L)).futureValue - offsetStore3.deleteOldTimestampOffsets().futureValue shouldBe 17 + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p2 -> 8L)), p2, 8L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 8L)), p3, 8L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p4 -> 10L)), p4, 10L)).futureValue + offsetStore3.deleteOldTimestampOffsets().futureValue shouldBe 20 } "validate timestamp of previous sequence number" in { diff --git a/akka-projection-r2dbc/src/main/resources/reference.conf b/akka-projection-r2dbc/src/main/resources/reference.conf index e87a15504..ee0184e02 100644 --- a/akka-projection-r2dbc/src/main/resources/reference.conf +++ b/akka-projection-r2dbc/src/main/resources/reference.conf @@ -27,14 +27,6 @@ akka.projection.r2dbc { # It should not be larger than the akka.projection.r2dbc.offset-store.time-window. backtracking-window = ${akka.persistence.r2dbc.query.backtracking.window} - # Keep this number of entries. Don't evict old entries until this threshold - # has been reached. - keep-number-of-entries = 10000 - - # Remove old entries outside the time-window from the offset store memory - # with this frequency. - evict-interval = 10 seconds - # Remove old entries outside the time-window from the offset store database # with this frequency. Can be disabled with `off`. delete-interval = 1 minute diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala index 8601b17bc..585decf15 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala @@ -7,6 +7,7 @@ package akka.projection.r2dbc import java.time.{ Duration => JDuration } import java.util.Locale +import scala.annotation.nowarn import scala.concurrent.duration._ import scala.jdk.DurationConverters._ @@ -57,8 +58,8 @@ object R2dbcProjectionSettings { useConnectionFactory = config.getString("use-connection-factory"), timeWindow = config.getDuration("offset-store.time-window"), backtrackingWindow = config.getDuration("offset-store.backtracking-window"), - keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"), - evictInterval = config.getDuration("offset-store.evict-interval"), + keepNumberOfEntries = 0, + evictInterval = JDuration.ZERO, deleteInterval, adoptInterval, logDbCallsExceeding, @@ -82,7 +83,9 @@ final class R2dbcProjectionSettings private ( val useConnectionFactory: String, val timeWindow: JDuration, val backtrackingWindow: JDuration, + @deprecated("Not used, evict is only based on time window", "1.6.3") val keepNumberOfEntries: Int, + @deprecated("Not used, evict is not periodic", "1.6.2") val evictInterval: JDuration, val deleteInterval: JDuration, val adoptInterval: JDuration, @@ -121,14 +124,17 @@ final class R2dbcProjectionSettings private ( def withBacktrackingWindow(backtrackingWindow: JDuration): R2dbcProjectionSettings = copy(backtrackingWindow = backtrackingWindow) + @deprecated("Not used, evict is only based on time window", "1.6.2") def withKeepNumberOfEntries(keepNumberOfEntries: Int): R2dbcProjectionSettings = - copy(keepNumberOfEntries = keepNumberOfEntries) + this + @deprecated("Not used, evict is not periodic", "1.6.2") def withEvictInterval(evictInterval: FiniteDuration): R2dbcProjectionSettings = - copy(evictInterval = evictInterval.toJava) + this + @deprecated("Not used, evict is not periodic", "1.6.2") def withEvictInterval(evictInterval: JDuration): R2dbcProjectionSettings = - copy(evictInterval = evictInterval) + this def withDeleteInterval(deleteInterval: FiniteDuration): R2dbcProjectionSettings = copy(deleteInterval = deleteInterval.toJava) @@ -154,6 +160,7 @@ final class R2dbcProjectionSettings private ( def withOffsetBatchSize(offsetBatchSize: Int): R2dbcProjectionSettings = copy(offsetBatchSize = offsetBatchSize) + @nowarn("msg=deprecated") private def copy( schema: Option[String] = schema, offsetTable: String = offsetTable, @@ -162,8 +169,6 @@ final class R2dbcProjectionSettings private ( useConnectionFactory: String = useConnectionFactory, timeWindow: JDuration = timeWindow, backtrackingWindow: JDuration = backtrackingWindow, - keepNumberOfEntries: Int = keepNumberOfEntries, - evictInterval: JDuration = evictInterval, deleteInterval: JDuration = deleteInterval, adoptInterval: JDuration = adoptInterval, logDbCallsExceeding: FiniteDuration = logDbCallsExceeding, @@ -186,5 +191,5 @@ final class R2dbcProjectionSettings private ( offsetBatchSize) override def toString = - s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $keepNumberOfEntries, $evictInterval, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize)" + s"R2dbcProjectionSettings($schema, $offsetTable, $timestampOffsetTable, $managementTable, $useConnectionFactory, $timeWindow, $deleteInterval, $logDbCallsExceeding, $warnAboutFilteredEventsInFlow, $offsetBatchSize)" } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala index e9d0a54b4..14f037868 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala @@ -37,7 +37,7 @@ private[projection] trait OffsetStoreDao { timestamp: Instant, storageRepresentation: OffsetSerialization.StorageRepresentation): Future[Done] - def deleteOldTimestampOffset(until: Instant, notInLatestBySlice: Seq[LatestBySlice]): Future[Long] + def deleteOldTimestampOffset(slice: Int, until: Instant): Future[Long] def deleteNewTimestampOffsetsInTx(connection: Connection, timestamp: Instant): Future[Long] diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala index 651a7af52..09e5cce51 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala @@ -6,7 +6,6 @@ package akka.projection.r2dbc.internal import java.time.Instant -import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -93,27 +92,17 @@ private[projection] class PostgresOffsetStoreDao( } /** - * delete less than a timestamp - * @param notInLatestBySlice not used in postgres, but needed in sql + * delete less than a timestamp for a given slice */ - @nowarn - protected def deleteOldTimestampOffsetSql(notInLatestBySlice: Seq[LatestBySlice]): String = + protected def deleteOldTimestampOffsetSql(): String = sql""" - DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset < ? - AND NOT (persistence_id || '-' || seq_nr) = ANY (?)""" + DELETE FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? AND timestamp_offset < ?""" - protected def bindDeleteOldTimestampOffsetSql( - stmt: Statement, - minSlice: Int, - maxSlice: Int, - until: Instant, - notInLatestBySlice: Seq[LatestBySlice]): Statement = { + protected def bindDeleteOldTimestampOffsetSql(stmt: Statement, slice: Int, until: Instant): Statement = { stmt - .bind(0, minSlice) - .bind(1, maxSlice) - .bind(2, projectionId.name) - .bindTimestamp(3, until) - .bind(4, notInLatestBySlice.iterator.map(record => s"${record.pid}-${record.seqNr}").toArray[String]) + .bind(0, slice) + .bind(1, projectionId.name) + .bindTimestamp(2, until) } // delete greater than or equal a timestamp @@ -357,12 +346,10 @@ private[projection] class PostgresOffsetStoreDao( R2dbcExecutor.updateInTx(statements).map(_ => Done)(ExecutionContext.parasitic) } - override def deleteOldTimestampOffset(until: Instant, notInLatestBySlice: Seq[LatestBySlice]): Future[Long] = { - val minSlice = timestampOffsetBySlicesSourceProvider.minSlice - val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice + override def deleteOldTimestampOffset(slice: Int, until: Instant): Future[Long] = { r2dbcExecutor.updateOne("delete old timestamp offset") { conn => - val stmt = conn.createStatement(deleteOldTimestampOffsetSql(notInLatestBySlice)) - bindDeleteOldTimestampOffsetSql(stmt, minSlice, maxSlice, until, notInLatestBySlice) + val stmt = conn.createStatement(deleteOldTimestampOffsetSql()) + bindDeleteOldTimestampOffsetSql(stmt, slice, until) } } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index cb69c93bc..e9b0a4739 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -26,15 +26,18 @@ import akka.projection.internal.OffsetSerialization.MultipleOffsets import akka.projection.r2dbc.R2dbcProjectionSettings import io.r2dbc.spi.Connection import org.slf4j.LoggerFactory - import java.time.Clock import java.time.Instant import java.time.{ Duration => JDuration } import java.util.UUID -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicReference +import java.lang.Boolean.FALSE +import java.lang.Boolean.TRUE + import scala.annotation.tailrec import scala.collection.immutable +import scala.collection.immutable.TreeSet import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -46,7 +49,26 @@ private[projection] object R2dbcOffsetStore { type SeqNr = Long type Pid = String - final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) + final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) extends Ordered[Record] { + + override def compare(that: Record): Int = { + val result = this.timestamp.compareTo(that.timestamp) + if (result == 0) { + if (this.slice == that.slice) + if (this.pid == that.pid) + if (this.seqNr == that.seqNr) + 0 + else + java.lang.Long.compare(this.seqNr, that.seqNr) + else + this.pid.compareTo(that.pid) + else Integer.compare(this.slice, that.slice) + } else { + result + } + } + } + final case class RecordWithOffset( record: Record, offset: TimestampOffset, @@ -57,7 +79,7 @@ private[projection] object R2dbcOffsetStore { final case class RecordWithProjectionKey(record: Record, projectionKey: String) object State { - val empty: State = State(Map.empty, Vector.empty, Instant.EPOCH, 0, Instant.EPOCH) + val empty: State = State(Map.empty, Map.empty, Instant.EPOCH) def apply(records: immutable.IndexedSeq[Record]): State = { if (records.isEmpty) empty @@ -65,27 +87,53 @@ private[projection] object R2dbcOffsetStore { } } - final case class State( - byPid: Map[Pid, Record], - latest: immutable.IndexedSeq[Record], - oldestTimestamp: Instant, - sizeAfterEvict: Int, - startTimestamp: Instant) { + final case class State(byPid: Map[Pid, Record], bySliceSorted: Map[Int, TreeSet[Record]], startTimestamp: Instant) { def size: Int = byPid.size - def latestTimestamp: Instant = - if (latest.isEmpty) Instant.EPOCH - else latest.head.timestamp + def oldestTimestamp: Instant = { + if (bySliceSorted.isEmpty) + Instant.EPOCH + else + bySliceSorted.valuesIterator.map(_.head.timestamp).min + } + + def latestTimestamp: Instant = { + if (bySliceSorted.isEmpty) + Instant.EPOCH + else + bySliceSorted.valuesIterator.map(_.last.timestamp).max + } def latestOffset: Option[TimestampOffset] = { - if (latest.isEmpty) + if (bySliceSorted.isEmpty) { None - else - Some(TimestampOffset(latestTimestamp, latest.map(r => r.pid -> r.seqNr).toMap)) + } else { + val t = latestTimestamp + // FIXME optimize this collection juggling? + val latest = + bySliceSorted.valuesIterator.flatMap { records => + if (records.last.timestamp == t) + records.toVector.reverseIterator.takeWhile(_.timestamp == t).toVector + else + Vector.empty + }.toVector + + val seen = latest.foldLeft(Map.empty[Pid, SeqNr]) { + case (acc, record) => + acc.get(record.pid) match { + case None => acc.updated(record.pid, record.seqNr) + case Some(existing) => + if (record.seqNr > existing) acc.updated(record.pid, record.seqNr) + else acc + } + } + + Some(TimestampOffset(t, seen)) + } } - def add(records: immutable.IndexedSeq[Record]): State = { + def add(records: Iterable[Record]): State = { records.foldLeft(this) { case (acc, r) => val newByPid = @@ -99,32 +147,17 @@ private[projection] object R2dbcOffsetStore { acc.byPid.updated(r.pid, r) } - val latestTimestamp = acc.latestTimestamp - val newLatest = - if (r.timestamp.isAfter(latestTimestamp)) { - Vector(r) - } else if (r.timestamp == latestTimestamp) { - acc.latest.find(_.pid == r.pid) match { - case None => acc.latest :+ r - case Some(existingRecord) => - // keep highest seqNr - if (r.seqNr >= existingRecord.seqNr) - acc.latest.filterNot(_.pid == r.pid) :+ r - else - acc.latest - } - } else { - acc.latest // older than existing latest, keep existing latest - } - val newOldestTimestamp = - if (acc.oldestTimestamp == Instant.EPOCH) - r.timestamp // first record - else if (r.timestamp.isBefore(acc.oldestTimestamp)) - r.timestamp - else - acc.oldestTimestamp // this is the normal case + val newBySliceSorted = + acc.bySliceSorted.updated(r.slice, acc.bySliceSorted.get(r.slice) match { + case Some(existing) => + // we don't remove existing older records for same pid, but they will + // be removed by eviction + existing + r + case None => + TreeSet.empty[Record] + r + }) - acc.copy(byPid = newByPid, latest = newLatest, oldestTimestamp = newOldestTimestamp) + acc.copy(byPid = newByPid, bySliceSorted = newBySliceSorted) } } @@ -135,29 +168,23 @@ private[projection] object R2dbcOffsetStore { } } - def window: JDuration = - JDuration.between(oldestTimestamp, latestTimestamp) - - private lazy val sortedByTimestamp: Vector[Record] = byPid.valuesIterator.toVector.sortBy(_.timestamp) - - lazy val latestBySlice: Vector[Record] = { - val builder = scala.collection.mutable.Map[Int, Record]() - sortedByTimestamp.reverseIterator.foreach { record => - if (!builder.contains(record.slice)) - builder.update(record.slice, record) - } - builder.values.toVector - } - - def evict(until: Instant, keepNumberOfEntries: Int): State = { - if (oldestTimestamp.isBefore(until) && size > keepNumberOfEntries) { - val newState = State( - sortedByTimestamp.take(size - keepNumberOfEntries).filterNot(_.timestamp.isBefore(until)) - ++ sortedByTimestamp.takeRight(keepNumberOfEntries) - ++ latestBySlice) - newState.copy(sizeAfterEvict = newState.size) - } else + def evict(slice: Int, timeWindow: JDuration): State = { + val recordsSortedByTimestamp = bySliceSorted.getOrElse(slice, TreeSet.empty[Record]) + if (recordsSortedByTimestamp.isEmpty) { this + } else { + // this will always keep at least one, latest per slice + val until = recordsSortedByTimestamp.last.timestamp.minus(timeWindow) + val filtered = recordsSortedByTimestamp.dropWhile(_.timestamp.isBefore(until)) + if (filtered.size == recordsSortedByTimestamp.size) { + this + } else { + val byPidOtherSlices = byPid.filterNot { case (_, r) => r.slice == slice } + val bySliceOtherSlices = bySliceSorted - slice + copy(byPid = byPidOtherSlices, bySliceSorted = bySliceOtherSlices) + .add(filtered) + } + } } } @@ -202,8 +229,6 @@ private[projection] class R2dbcOffsetStore( private val persistenceExt = Persistence(system) - private val evictWindow = settings.timeWindow.plus(settings.evictInterval) - private val offsetSerialization = new OffsetSerialization(system) import offsetSerialization.fromStorageRepresentation import offsetSerialization.toStorageRepresentation @@ -224,6 +249,13 @@ private[projection] class R2dbcOffsetStore( dialect.createOffsetStoreDao(settings, sourceProvider, system, r2dbcExecutor, projectionId) } + private val (minSlice, maxSlice) = { + sourceProvider match { + case Some(provider) => (provider.minSlice, provider.maxSlice) + case None => (0, persistenceExt.numberOfSlices - 1) + } + } + private[projection] implicit val executionContext: ExecutionContext = system.executionContext // The OffsetStore instance is used by a single projectionId and there shouldn't be any concurrent @@ -237,17 +269,7 @@ private[projection] class R2dbcOffsetStore( private val inflight = new AtomicReference(Map.empty[Pid, SeqNr]) // To avoid delete requests when no new offsets have been stored since previous delete - private val idle = new AtomicBoolean(false) - - // To trigger next deletion after in-memory eviction - private val triggerDeletion = new AtomicBoolean(false) - - if (!settings.deleteInterval.isZero && !settings.deleteInterval.isNegative) - system.scheduler.scheduleWithFixedDelay( - settings.deleteInterval, - settings.deleteInterval, - () => deleteOldTimestampOffsets(), - system.executionContext) + private val triggerDeletionPerSlice = new ConcurrentHashMap[Int, java.lang.Boolean] // Foreign offsets (latest by slice offsets from other projection keys) that should be adopted when passed in time. // Contains remaining offsets to adopt. Sorted by timestamp. Can be updated concurrently with CAS retries. @@ -269,6 +291,12 @@ private[projection] class R2dbcOffsetStore( system.executionContext)) else None + private def scheduleNextDelete(): Unit = { + if (!settings.deleteInterval.isZero && !settings.deleteInterval.isNegative) + system.scheduler.scheduleOnce(settings.deleteInterval, () => deleteOldTimestampOffsets(), system.executionContext) + } + scheduleNextDelete() + private def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = { sourceProvider match { case Some(timestampQuery: EventTimestampQuery) => @@ -314,7 +342,7 @@ private[projection] class R2dbcOffsetStore( } private def readTimestampOffset(): Future[Option[TimestampOffset]] = { - idle.set(false) + triggerDeletionPerSlice.clear() val oldState = state.get() dao.readTimestampOffset().map { recordsWithKey => @@ -322,7 +350,12 @@ private[projection] class R2dbcOffsetStore( clearForeignOffsets() clearLatestSeen() - val newState = State(recordsWithKey.map(_.record)) + val newState = { + val s = State(recordsWithKey.map(_.record)) + (minSlice to maxSlice).foldLeft(s) { + case (acc, slice) => acc.evict(slice, settings.timeWindow) + } + } val startOffset = if (newState == State.empty) { @@ -470,7 +503,6 @@ private[projection] class R2dbcOffsetStore( } private def saveTimestampOffsetInTx(conn: Connection, records: immutable.IndexedSeq[Record]): Future[Done] = { - idle.set(false) val oldState = state.get() val filteredRecords = { if (records.size <= 1) @@ -497,32 +529,21 @@ private[projection] class R2dbcOffsetStore( } else { val newState = oldState.add(filteredRecords) - // accumulate some more than the timeWindow before evicting, and at least 10% increase of size - // for testing keepNumberOfEntries = 0 is used - val evictThresholdReached = - if (settings.keepNumberOfEntries == 0) true else newState.size > (newState.sizeAfterEvict * 1.1).toInt - val evictedNewState = - if (newState.size > settings.keepNumberOfEntries && evictThresholdReached && newState.window - .compareTo(evictWindow) > 0) { - val evictUntil = newState.latestTimestamp.minus(settings.timeWindow) - val s = newState.evict(evictUntil, settings.keepNumberOfEntries) - triggerDeletion.set(true) - logger.debug( - "Evicted [{}] records until [{}], keeping [{}] records. Latest [{}].", - newState.size - s.size, - evictUntil, - s.size, - newState.latestTimestamp) - s - } else - newState + val slices = + if (filteredRecords.size == 1) Set(filteredRecords.head.slice) + else filteredRecords.iterator.map(_.slice).toSet + + val evictedNewState = slices.foldLeft(newState) { + case (s, slice) => s.evict(slice, settings.timeWindow) + } val offsetInserts = dao.insertTimestampOffsetInTx(conn, filteredRecords) offsetInserts.map { _ => - if (state.compareAndSet(oldState, evictedNewState)) + if (state.compareAndSet(oldState, evictedNewState)) { + slices.foreach(s => triggerDeletionPerSlice.put(s, TRUE)) cleanupInflight(evictedNewState) - else + } else throw new IllegalStateException("Unexpected concurrent modification of state from saveOffset.") Done } @@ -806,43 +827,59 @@ private[projection] class R2dbcOffsetStore( } def deleteOldTimestampOffsets(): Future[Long] = { - if (idle.getAndSet(true)) { - // no new offsets stored since previous delete - Future.successful(0) - } else { - val currentState = getState() - if (!triggerDeletion.getAndSet(false) && currentState.window.compareTo(settings.timeWindow) < 0) { - // it hasn't filled up the window yet - Future.successful(0) - } else { - val until = currentState.latestTimestamp.minus(settings.timeWindow) - - val notInLatestBySlice = currentState.latestBySlice.collect { - case record if record.timestamp.isBefore(until) => - // note that deleteOldTimestampOffsetSql already has `AND timestamp_offset < ?` - // and that's why timestamp >= until don't have to be included here - LatestBySlice(record.slice, record.pid, record.seqNr) + // This is running in the background, so fine to progress slowly one slice at a time + def loop(slice: Int, count: Long): Future[Long] = { + if (slice > maxSlice) + Future.successful(count) + else + deleteOldTimestampOffsets(slice).flatMap { c => + loop(slice + 1, count + c) } - val result = dao.deleteOldTimestampOffset(until, notInLatestBySlice) - result.failed.foreach { exc => - idle.set(false) // try again next tick - logger.warn( - "Failed to delete timestamp offset until [{}] for projection [{}]: {}", - until, + } + + val result = loop(minSlice, 0L) + + if (logger.isDebugEnabled) + result.foreach { rows => + logger.debug("Deleted [{}] timestamp offset rows for projection [{}]", rows, projectionId.id) + } + + result.andThen(_ => scheduleNextDelete()) + } + + def deleteOldTimestampOffsets(slice: Int): Future[Long] = { + val triggerDeletion = triggerDeletionPerSlice.put(slice, FALSE) + val currentState = getState() + if ((triggerDeletion == null || triggerDeletion == TRUE) && currentState.bySliceSorted.contains(slice)) { + val latest = currentState.bySliceSorted(slice).last + val until = latest.timestamp.minus(settings.timeWindow) + + // note that deleteOldTimestampOffsetSql already has `AND timestamp_offset < ?`, + // which means that the latest for this slice will not be deleted + val result = dao.deleteOldTimestampOffset(slice, until) + result.failed.foreach { exc => + triggerDeletionPerSlice.put(slice, TRUE) // try again next tick + logger.warn( + "Failed to delete timestamp offset for projection [{}], slice [{}], until [{}] : {}", + projectionId.id, + slice, + until, + exc.toString) + } + if (logger.isDebugEnabled) + result.foreach { rows => + logger.debug( + "Deleted [{}] timestamp offset rows for projection [{}], slice [{}], until [{}]", + rows, projectionId.id, - exc.toString) + slice, + until) } - if (logger.isDebugEnabled) - result.foreach { rows => - logger.debug( - "Deleted [{}] timestamp offset rows until [{}] for projection [{}].", - rows, - until, - projectionId.id) - } - result - } + result + } else { + // no new offsets stored since previous delete + Future.successful(0L) } } @@ -971,7 +1008,7 @@ private[projection] class R2dbcOffsetStore( private def clearTimestampOffset(): Future[Done] = { sourceProvider match { case Some(_) => - idle.set(false) + triggerDeletionPerSlice.clear() dao .clearTimestampOffset() .map { n => diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala index 9fd356dcf..9c9f74b78 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala @@ -21,7 +21,6 @@ import akka.projection.r2dbc.R2dbcProjectionSettings import akka.projection.BySlicesSourceProvider import akka.projection.ProjectionId import akka.projection.internal.OffsetSerialization.SingleOffset -import akka.projection.r2dbc.internal.R2dbcOffsetStore.LatestBySlice /** * INTERNAL API @@ -97,58 +96,17 @@ private[projection] class SqlServerOffsetStoreDao( .bind("@projectionKey", projectionId.key) } - /** - * The r2dbc-sqlserver driver seems to not support binding of array[T]. - * So have to bake the param into the statement instead of binding it. - * - * @param notInLatestBySlice not used in postgres, but needed in sql - * @return - */ - override protected def deleteOldTimestampOffsetSql(notInLatestBySlice: Seq[LatestBySlice]): String = { - val base = - s"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN @from AND @to AND projection_name = @projectionName AND timestamp_offset < @timestampOffset" - if (notInLatestBySlice.isEmpty) { - sql"$base" - } else { - - val values = (timestampOffsetBySlicesSourceProvider.minSlice to timestampOffsetBySlicesSourceProvider.maxSlice) - .map { i => - s"@s$i" - } - .mkString(", ") - sql""" - $base - AND CONCAT(persistence_id, '-', seq_nr) NOT IN ($values)""" - } + override protected def deleteOldTimestampOffsetSql(): String = { + s"DELETE FROM $timestampOffsetTable WHERE slice = @slice AND projection_name = @projectionName AND timestamp_offset < @timestampOffset" } - override protected def bindDeleteOldTimestampOffsetSql( - stmt: Statement, - minSlice: Int, - maxSlice: Int, - until: Instant, - notInLatestBySlice: Seq[LatestBySlice]): Statement = { + override protected def bindDeleteOldTimestampOffsetSql(stmt: Statement, slice: Int, until: Instant): Statement = { stmt - .bind("@from", minSlice) - .bind("@to", maxSlice) + .bind("@slice", slice) .bind("@projectionName", projectionId.name) .bindTimestamp("@timestampOffset", until) - if (notInLatestBySlice.nonEmpty) { - val sliceLookup = notInLatestBySlice.map { item => - item.slice -> item - }.toMap - - (timestampOffsetBySlicesSourceProvider.minSlice to timestampOffsetBySlicesSourceProvider.maxSlice).foreach { i => - val bindKey = s"@s$i" - sliceLookup.get(i) match { - case Some(value) => stmt.bind(bindKey, s"${value.pid}-${value.seqNr}") - case None => stmt.bind(bindKey, "-") - } - } - } - stmt }