diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala index 617a5ad29..1e5baa90d 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala @@ -6,11 +6,14 @@ package akka.projection.dynamodb import java.time.Instant import java.time.{ Duration => JDuration } +import java.util.Optional import java.util.UUID import java.util.concurrent.CompletionException +import java.util.concurrent.CompletionStage import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger +import java.util.function.Supplier import java.util.{ HashMap => JHashMap } import scala.annotation.tailrec @@ -20,7 +23,9 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ import scala.util.Failure import scala.util.Success @@ -159,6 +164,68 @@ object DynamoDBTimestampOffsetProjectionSpec { } } + class JavaTestTimestampSourceProvider( + envelopes: immutable.IndexedSeq[EventEnvelope[String]], + testSourceProvider: akka.projection.testkit.javadsl.TestSourceProvider[Offset, EventEnvelope[String]], + override val maxSlice: Int, + enableCurrentEventsByPersistenceId: Boolean) + extends akka.projection.javadsl.SourceProvider[Offset, EventEnvelope[String]] + with BySlicesSourceProvider + with akka.persistence.query.typed.javadsl.EventTimestampQuery + with akka.persistence.query.typed.javadsl.LoadEventQuery + with LoadEventsByPersistenceIdSourceProvider[String] { + + override def source(offset: Supplier[CompletionStage[Optional[Offset]]]) + : CompletionStage[akka.stream.javadsl.Source[EventEnvelope[String], NotUsed]] = + testSourceProvider.source(offset) + + override def extractOffset(envelope: EventEnvelope[String]): Offset = + testSourceProvider.extractOffset(envelope) + + override def extractCreationTime(envelope: EventEnvelope[String]): Long = + testSourceProvider.extractCreationTime(envelope) + + override def minSlice: Int = 0 + + override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] = { + Future + .successful(envelopes.collectFirst { + case env + if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr && env.offset + .isInstanceOf[TimestampOffset] => + env.offset.asInstanceOf[TimestampOffset].timestamp + }.toJava) + .asJava + } + + override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] = { + envelopes.collectFirst { + case env if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr => + env.asInstanceOf[EventEnvelope[Event]] + } match { + case Some(env) => Future.successful(env).asJava + case None => + Future + .failed( + new NoSuchElementException( + s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found.")) + .asJava + } + } + + override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = { + if (enableCurrentEventsByPersistenceId) + Some(Source(envelopes.filter { env => + env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr + })) + else + None + } + } + // test model is as simple as a text that gets other string concatenated to it case class ConcatStr(id: String, text: String) { def concat(newMsg: String): ConcatStr = { @@ -326,6 +393,32 @@ class DynamoDBTimestampOffsetProjectionSpec enableCurrentEventsByPersistenceId) } + // envelopes are emitted by the "query" source, but allEnvelopes can be loaded + def createJavaSourceProviderWithMoreEnvelopes( + envelopes: immutable.IndexedSeq[EventEnvelope[String]], + allEnvelopes: immutable.IndexedSeq[EventEnvelope[String]], + enableCurrentEventsByPersistenceId: Boolean, + complete: Boolean = true): JavaTestTimestampSourceProvider = { + val sp = + akka.projection.testkit.javadsl.TestSourceProvider + .create[Offset, EventEnvelope[String]](akka.stream.javadsl.Source.from(envelopes.asJava), _.offset) + .withStartSourceFrom { + case (lastProcessedOffsetBySlice: TimestampOffsetBySlice, offset: TimestampOffset) => + // FIXME: should have the envelope slice to handle this properly + val lastProcessedOffset = lastProcessedOffsetBySlice.offsets.head._2 + offset.timestamp.isBefore(lastProcessedOffset.timestamp) || + (offset.timestamp == lastProcessedOffset.timestamp && offset.seen == lastProcessedOffset.seen) + case _ => false + } + .withAllowCompletion(complete) + + new JavaTestTimestampSourceProvider( + allEnvelopes, + sp, + persistenceExt.numberOfSlices - 1, + enableCurrentEventsByPersistenceId) + } + def createBacktrackingSourceProvider( envelopes: immutable.IndexedSeq[EventEnvelope[String]], complete: Boolean = true): TestTimestampSourceProvider = { @@ -1884,6 +1977,129 @@ class DynamoDBTimestampOffsetProjectionSpec latestOffsetShouldBe(allEnvelopes.last.offset) } } + + "replay rejected sequence numbers for at-least-once grouped (javadsl)" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val allEnvelopes = createEnvelopes(pid1, 10) ++ createEnvelopes(pid2, 3) + val skipPid1SeqNrs = Set(3L, 4L, 5L, 7L, 9L) + val envelopes = allEnvelopes.filterNot { env => + (env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) || + (env.persistenceId == pid2 && (env.sequenceNr == 1)) + } + + val sourceProvider = + createJavaSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val results = new ConcurrentHashMap[String, String]() + + val handler: akka.projection.javadsl.Handler[java.util.List[EventEnvelope[String]]] = + (envelopes: java.util.List[EventEnvelope[String]]) => { + Future { + envelopes.asScala.foreach { envelope => + results.putIfAbsent(envelope.persistenceId, "|") + results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|") + } + }.map(_ => Done.getInstance()).asJava + } + + val projection = + akka.projection.dynamodb.javadsl.DynamoDBProjection + .atLeastOnceGroupedWithin( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)).toJava, + sourceProvider, + handler = () => handler, + system) + .withGroup(8, 3.seconds.toJava) + + offsetShouldBeEmpty() + + projectionTestKit.run(projection) { + results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|e10|" + results.get(pid2) shouldBe "|e1|e2|e3|" + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + } + + "replay rejected sequence numbers due to clock skew on event write for at-least-once grouped (javadsl)" in { + val pid1 = UUID.randomUUID().toString + val pid2 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + + val start = tick().instant() + + def createEnvelopesFor( + pid: Pid, + fromSeqNr: Int, + toSeqNr: Int, + fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = { + (fromSeqNr to toSeqNr).map { n => + createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n") + } + } + + val envelopes1 = + createEnvelopesFor(pid1, 1, 2, start) ++ + createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap + createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping + + val envelopes2 = + createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++ + createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9 + createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap + + val allEnvelopes = envelopes1 ++ envelopes2 + + val envelopes = allEnvelopes.sortBy(_.timestamp) + + val sourceProvider = + createJavaSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true) + + implicit val offsetStore: DynamoDBOffsetStore = + new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client) + + val results = new ConcurrentHashMap[String, String]() + + val handler: akka.projection.javadsl.Handler[java.util.List[EventEnvelope[String]]] = + (envelopes: java.util.List[EventEnvelope[String]]) => { + Future { + envelopes.asScala.foreach { envelope => + results.putIfAbsent(envelope.persistenceId, "|") + results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|") + } + }.map(_ => Done.getInstance()).asJava + } + + val projection = + akka.projection.dynamodb.javadsl.DynamoDBProjection + .atLeastOnceGroupedWithin( + projectionId, + Some(settings.withReplayOnRejectedSequenceNumbers(true)).toJava, + sourceProvider, + handler = () => handler, + system) + .withGroup(2, 3.seconds.toJava) + + offsetShouldBeEmpty() + + projectionTestKit.run(projection) { + results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|" + results.get(pid2) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|" + } + + eventually { + latestOffsetShouldBe(allEnvelopes.last.offset) + } + } } "A DynamoDB flow projection with TimestampOffset" must { diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 8c8fef6b4..34a3e679d 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -48,6 +48,7 @@ import akka.projection.internal.GroupedHandlerStrategy import akka.projection.internal.HandlerStrategy import akka.projection.internal.InternalProjection import akka.projection.internal.InternalProjectionState +import akka.projection.internal.JavaToScalaBySliceSourceProviderAdapter import akka.projection.internal.ManagementState import akka.projection.internal.OffsetStoredByHandler import akka.projection.internal.OffsetStrategy @@ -218,7 +219,11 @@ private[projection] object DynamoDBProjectionImpl { val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => - sourceProvider match { + val underlyingProvider = sourceProvider match { + case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate + case provider => provider + } + underlyingProvider match { case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] if offsetStore.settings.replayOnRejectedSequenceNumbers => val persistenceId = originalEventEnvelope.persistenceId @@ -329,7 +334,11 @@ private[projection] object DynamoDBProjectionImpl { val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => - sourceProvider match { + val underlyingProvider = sourceProvider match { + case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate + case provider => provider + } + underlyingProvider match { case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] if offsetStore.settings.replayOnRejectedSequenceNumbers => val persistenceId = originalEventEnvelope.persistenceId @@ -478,7 +487,11 @@ private[projection] object DynamoDBProjectionImpl { val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => - sourceProvider match { + val underlyingProvider = sourceProvider match { + case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate + case provider => provider + } + underlyingProvider match { case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] if offsetStore.settings.replayOnRejectedSequenceNumbers => val persistenceId = originalEventEnvelope.persistenceId @@ -631,7 +644,11 @@ private[projection] object DynamoDBProjectionImpl { val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => - sourceProvider match { + val underlyingProvider = sourceProvider match { + case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate + case provider => provider + } + underlyingProvider match { case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] if offsetStore.settings.replayOnRejectedSequenceNumbers => val persistenceId = originalEventEnvelope.persistenceId @@ -713,7 +730,11 @@ private[projection] object DynamoDBProjectionImpl { val logPrefix = offsetStore.logPrefix originalEnvelope match { case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 => - sourceProvider match { + val underlyingProvider = sourceProvider match { + case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate + case provider => provider + } + underlyingProvider match { case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] if offsetStore.settings.replayOnRejectedSequenceNumbers => val persistenceId = originalEventEnvelope.persistenceId