From f2813ed2069270d9cc36e357e1ea1c5dcf47e4d0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Dec 2023 14:12:25 +0100 Subject: [PATCH] feat: updateTimestampOffset --- .../akka/projection/ProjectionBehavior.scala | 9 +++++++ .../scaladsl/ProjectionManagement.scala | 26 ++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/akka-projection-core/src/main/scala/akka/projection/ProjectionBehavior.scala b/akka-projection-core/src/main/scala/akka/projection/ProjectionBehavior.scala index dc91b5e7e..96c955fe8 100644 --- a/akka-projection-core/src/main/scala/akka/projection/ProjectionBehavior.scala +++ b/akka-projection-core/src/main/scala/akka/projection/ProjectionBehavior.scala @@ -3,6 +3,8 @@ */ package akka.projection +import java.time.Instant + import akka.actor.typed.scaladsl.LoggerOps import scala.util.Failure import scala.util.Success @@ -19,6 +21,7 @@ import akka.actor.typed.scaladsl.StashBuffer import akka.annotation.InternalApi import akka.projection.internal.ManagementState import akka.projection.scaladsl.ProjectionManagement +import akka.projection.scaladsl.ProjectionManagement.UpdateTimestampOffset object ProjectionBehavior { @@ -48,6 +51,12 @@ object ProjectionBehavior { extends ProjectionManagementCommand final case class SetOffsetResult[Offset](replyTo: ActorRef[Done]) extends ProjectionManagementCommand + final case class SetTimestampOffset( + projectionId: ProjectionId, + updates: Set[UpdateTimestampOffset], + replyTo: ActorRef[Done]) + extends ProjectionManagementCommand + final case class IsPaused(projectionId: ProjectionId, replyTo: ActorRef[Boolean]) extends ProjectionManagementCommand final case class SetPaused(projectionId: ProjectionId, paused: Boolean, replyTo: ActorRef[Done]) diff --git a/akka-projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala b/akka-projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala index 17128ad14..2a9e25706 100644 --- a/akka-projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala +++ b/akka-projection-core/src/main/scala/akka/projection/scaladsl/ProjectionManagement.scala @@ -5,10 +5,12 @@ package akka.projection.scaladsl import java.util.concurrent.ConcurrentHashMap + import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.TimeoutException import scala.concurrent.duration.FiniteDuration + import akka.Done import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem @@ -20,14 +22,18 @@ import akka.projection.ProjectionBehavior import akka.projection.ProjectionId import akka.util.JavaDurationConverters._ import akka.util.Timeout - import java.net.URLEncoder import java.nio.charset.StandardCharsets +import java.time.Instant + +import akka.projection.scaladsl.ProjectionManagement.UpdateTimestampOffset object ProjectionManagement extends ExtensionId[ProjectionManagement] { def createExtension(system: ActorSystem[_]): ProjectionManagement = new ProjectionManagement(system) def get(system: ActorSystem[_]): ProjectionManagement = apply(system) + + final case class UpdateTimestampOffset(persistenceId: String, seqNr: Long, timestamp: Instant) } class ProjectionManagement(system: ActorSystem[_]) extends Extension { @@ -100,6 +106,24 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension { retry(() => askSetOffset()) } + /** + * Update the stored `TimestampOffset` for the `projectionId` and restart the `Projection`. + * This can be useful if the projection was stuck with errors on a specific offset and should skip + * that offset and continue with next. + * + * Another use case is to populate the offset store with know starting points when enabling change events + * for Durable State. In that case the offset sequence number should be set to current Durable State + * revision minus 1. + */ + def updateTimestampOffset(projectionId: ProjectionId, updates: Set[UpdateTimestampOffset]): Future[Done] = { + def askSetTimestampOffset(): Future[Done] = { + topic(projectionId.name) + .ask(replyTo => Topic.Publish(SetTimestampOffset(projectionId, updates, replyTo))) + } + + retry(() => askSetTimestampOffset()) + } + private def retry[T](operation: () => Future[T]): Future[T] = { def attempt(remaining: Int): Future[T] = { operation().recoverWith {