Skip to content

Commit

Permalink
fix: handle adapted java source providers for replay (#1279)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter authored Nov 30, 2024
1 parent 34b7bee commit c357722
Show file tree
Hide file tree
Showing 2 changed files with 242 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ package akka.projection.dynamodb

import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.Optional
import java.util.UUID
import java.util.concurrent.CompletionException
import java.util.concurrent.CompletionStage
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Supplier
import java.util.{ HashMap => JHashMap }

import scala.annotation.tailrec
Expand All @@ -20,7 +23,9 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters._
import scala.util.Failure
import scala.util.Success

Expand Down Expand Up @@ -159,6 +164,68 @@ object DynamoDBTimestampOffsetProjectionSpec {
}
}

class JavaTestTimestampSourceProvider(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
testSourceProvider: akka.projection.testkit.javadsl.TestSourceProvider[Offset, EventEnvelope[String]],
override val maxSlice: Int,
enableCurrentEventsByPersistenceId: Boolean)
extends akka.projection.javadsl.SourceProvider[Offset, EventEnvelope[String]]
with BySlicesSourceProvider
with akka.persistence.query.typed.javadsl.EventTimestampQuery
with akka.persistence.query.typed.javadsl.LoadEventQuery
with LoadEventsByPersistenceIdSourceProvider[String] {

override def source(offset: Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[akka.stream.javadsl.Source[EventEnvelope[String], NotUsed]] =
testSourceProvider.source(offset)

override def extractOffset(envelope: EventEnvelope[String]): Offset =
testSourceProvider.extractOffset(envelope)

override def extractCreationTime(envelope: EventEnvelope[String]): Long =
testSourceProvider.extractCreationTime(envelope)

override def minSlice: Int = 0

override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] = {
Future
.successful(envelopes.collectFirst {
case env
if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr && env.offset
.isInstanceOf[TimestampOffset] =>
env.offset.asInstanceOf[TimestampOffset].timestamp
}.toJava)
.asJava
}

override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] = {
envelopes.collectFirst {
case env if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr =>
env.asInstanceOf[EventEnvelope[Event]]
} match {
case Some(env) => Future.successful(env).asJava
case None =>
Future
.failed(
new NoSuchElementException(
s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found."))
.asJava
}
}

override private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = {
if (enableCurrentEventsByPersistenceId)
Some(Source(envelopes.filter { env =>
env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr
}))
else
None
}
}

// test model is as simple as a text that gets other string concatenated to it
case class ConcatStr(id: String, text: String) {
def concat(newMsg: String): ConcatStr = {
Expand Down Expand Up @@ -326,6 +393,32 @@ class DynamoDBTimestampOffsetProjectionSpec
enableCurrentEventsByPersistenceId)
}

// envelopes are emitted by the "query" source, but allEnvelopes can be loaded
def createJavaSourceProviderWithMoreEnvelopes(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
allEnvelopes: immutable.IndexedSeq[EventEnvelope[String]],
enableCurrentEventsByPersistenceId: Boolean,
complete: Boolean = true): JavaTestTimestampSourceProvider = {
val sp =
akka.projection.testkit.javadsl.TestSourceProvider
.create[Offset, EventEnvelope[String]](akka.stream.javadsl.Source.from(envelopes.asJava), _.offset)
.withStartSourceFrom {
case (lastProcessedOffsetBySlice: TimestampOffsetBySlice, offset: TimestampOffset) =>
// FIXME: should have the envelope slice to handle this properly
val lastProcessedOffset = lastProcessedOffsetBySlice.offsets.head._2
offset.timestamp.isBefore(lastProcessedOffset.timestamp) ||
(offset.timestamp == lastProcessedOffset.timestamp && offset.seen == lastProcessedOffset.seen)
case _ => false
}
.withAllowCompletion(complete)

new JavaTestTimestampSourceProvider(
allEnvelopes,
sp,
persistenceExt.numberOfSlices - 1,
enableCurrentEventsByPersistenceId)
}

def createBacktrackingSourceProvider(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
complete: Boolean = true): TestTimestampSourceProvider = {
Expand Down Expand Up @@ -1884,6 +1977,129 @@ class DynamoDBTimestampOffsetProjectionSpec
latestOffsetShouldBe(allEnvelopes.last.offset)
}
}

"replay rejected sequence numbers for at-least-once grouped (javadsl)" in {
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val allEnvelopes = createEnvelopes(pid1, 10) ++ createEnvelopes(pid2, 3)
val skipPid1SeqNrs = Set(3L, 4L, 5L, 7L, 9L)
val envelopes = allEnvelopes.filterNot { env =>
(env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) ||
(env.persistenceId == pid2 && (env.sequenceNr == 1))
}

val sourceProvider =
createJavaSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)

implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val results = new ConcurrentHashMap[String, String]()

val handler: akka.projection.javadsl.Handler[java.util.List[EventEnvelope[String]]] =
(envelopes: java.util.List[EventEnvelope[String]]) => {
Future {
envelopes.asScala.foreach { envelope =>
results.putIfAbsent(envelope.persistenceId, "|")
results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|")
}
}.map(_ => Done.getInstance()).asJava
}

val projection =
akka.projection.dynamodb.javadsl.DynamoDBProjection
.atLeastOnceGroupedWithin(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)).toJava,
sourceProvider,
handler = () => handler,
system)
.withGroup(8, 3.seconds.toJava)

offsetShouldBeEmpty()

projectionTestKit.run(projection) {
results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|e10|"
results.get(pid2) shouldBe "|e1|e2|e3|"
}

eventually {
latestOffsetShouldBe(allEnvelopes.last.offset)
}
}

"replay rejected sequence numbers due to clock skew on event write for at-least-once grouped (javadsl)" in {
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val start = tick().instant()

def createEnvelopesFor(
pid: Pid,
fromSeqNr: Int,
toSeqNr: Int,
fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = {
(fromSeqNr to toSeqNr).map { n =>
createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n")
}
}

val envelopes1 =
createEnvelopesFor(pid1, 1, 2, start) ++
createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap
createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping

val envelopes2 =
createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++
createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9
createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap

val allEnvelopes = envelopes1 ++ envelopes2

val envelopes = allEnvelopes.sortBy(_.timestamp)

val sourceProvider =
createJavaSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)

implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val results = new ConcurrentHashMap[String, String]()

val handler: akka.projection.javadsl.Handler[java.util.List[EventEnvelope[String]]] =
(envelopes: java.util.List[EventEnvelope[String]]) => {
Future {
envelopes.asScala.foreach { envelope =>
results.putIfAbsent(envelope.persistenceId, "|")
results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|")
}
}.map(_ => Done.getInstance()).asJava
}

val projection =
akka.projection.dynamodb.javadsl.DynamoDBProjection
.atLeastOnceGroupedWithin(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)).toJava,
sourceProvider,
handler = () => handler,
system)
.withGroup(2, 3.seconds.toJava)

offsetShouldBeEmpty()

projectionTestKit.run(projection) {
results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|"
results.get(pid2) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|"
}

eventually {
latestOffsetShouldBe(allEnvelopes.last.offset)
}
}
}

"A DynamoDB flow projection with TimestampOffset" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import akka.projection.internal.GroupedHandlerStrategy
import akka.projection.internal.HandlerStrategy
import akka.projection.internal.InternalProjection
import akka.projection.internal.InternalProjectionState
import akka.projection.internal.JavaToScalaBySliceSourceProviderAdapter
import akka.projection.internal.ManagementState
import akka.projection.internal.OffsetStoredByHandler
import akka.projection.internal.OffsetStrategy
Expand Down Expand Up @@ -218,7 +219,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -329,7 +334,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -478,7 +487,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -631,7 +644,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -713,7 +730,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down

0 comments on commit c357722

Please sign in to comment.