Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: updateTimestampOffset #1102

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/

package akka.projection
import java.time.Instant

import akka.actor.typed.scaladsl.LoggerOps
import scala.util.Failure
import scala.util.Success
Expand All @@ -19,6 +21,7 @@ import akka.actor.typed.scaladsl.StashBuffer
import akka.annotation.InternalApi
import akka.projection.internal.ManagementState
import akka.projection.scaladsl.ProjectionManagement
import akka.projection.scaladsl.ProjectionManagement.UpdateTimestampOffset

object ProjectionBehavior {

Expand Down Expand Up @@ -48,6 +51,12 @@ object ProjectionBehavior {
extends ProjectionManagementCommand
final case class SetOffsetResult[Offset](replyTo: ActorRef[Done]) extends ProjectionManagementCommand

final case class SetTimestampOffset(
projectionId: ProjectionId,
updates: Set[UpdateTimestampOffset],
replyTo: ActorRef[Done])
extends ProjectionManagementCommand

final case class IsPaused(projectionId: ProjectionId, replyTo: ActorRef[Boolean])
extends ProjectionManagementCommand
final case class SetPaused(projectionId: ProjectionId, paused: Boolean, replyTo: ActorRef[Done])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package akka.projection.scaladsl

import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
Expand All @@ -20,14 +22,18 @@ import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.util.JavaDurationConverters._
import akka.util.Timeout

import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import java.time.Instant

import akka.projection.scaladsl.ProjectionManagement.UpdateTimestampOffset

object ProjectionManagement extends ExtensionId[ProjectionManagement] {
def createExtension(system: ActorSystem[_]): ProjectionManagement = new ProjectionManagement(system)

def get(system: ActorSystem[_]): ProjectionManagement = apply(system)

final case class UpdateTimestampOffset(persistenceId: String, seqNr: Long, timestamp: Instant)
}

class ProjectionManagement(system: ActorSystem[_]) extends Extension {
Expand Down Expand Up @@ -100,6 +106,24 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension {
retry(() => askSetOffset())
}

/**
* Update the stored `TimestampOffset` for the `projectionId` and restart the `Projection`.
* This can be useful if the projection was stuck with errors on a specific offset and should skip
* that offset and continue with next.
*
* Another use case is to populate the offset store with know starting points when enabling change events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Another use case is to populate the offset store with know starting points when enabling change events
* Another use case is to populate the offset store with known starting points when enabling change events

* for Durable State. In that case the offset sequence number should be set to current Durable State
* revision minus 1.
*/
def updateTimestampOffset(projectionId: ProjectionId, updates: Set[UpdateTimestampOffset]): Future[Done] = {
def askSetTimestampOffset(): Future[Done] = {
topic(projectionId.name)
.ask(replyTo => Topic.Publish(SetTimestampOffset(projectionId, updates, replyTo)))
}

retry(() => askSetTimestampOffset())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would one handling starting a new projection from the offsets, hold back the consumer until the response comes back from this API, because it would start consuming from start as soon as it has started executing, right?


private def retry[T](operation: () => Future[T]): Future[T] = {
def attempt(remaining: Int): Future[T] = {
operation().recoverWith {
Expand Down
Loading