Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 28, 2024
1 parent 8424450 commit cb9063c
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Pid
import akka.projection.dynamodb.internal.DynamoDBOffsetStore.SeqNr
import akka.projection.dynamodb.scaladsl.DynamoDBProjection
import akka.projection.dynamodb.scaladsl.DynamoDBTransactHandler
import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider
import akka.projection.scaladsl.Handler
import akka.projection.scaladsl.SourceProvider
import akka.projection.testkit.scaladsl.ProjectionTestKit
Expand Down Expand Up @@ -104,11 +105,13 @@ object DynamoDBTimestampOffsetProjectionSpec {
class TestTimestampSourceProvider(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
testSourceProvider: TestSourceProvider[Offset, EventEnvelope[String]],
override val maxSlice: Int)
override val maxSlice: Int,
enableCurrentEventsByPersistenceId: Boolean)
extends SourceProvider[Offset, EventEnvelope[String]]
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
with LoadEventQuery
with LoadEventsByPersistenceIdSourceProvider[String] {

override def source(offset: () => Future[Option[Offset]]): Future[Source[EventEnvelope[String], NotUsed]] =
testSourceProvider.source(offset)
Expand Down Expand Up @@ -142,6 +145,18 @@ object DynamoDBTimestampOffsetProjectionSpec {
s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found."))
}
}

override private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = {
if (enableCurrentEventsByPersistenceId)
Some(Source(envelopes.filter { env =>
env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr
}))
else
None
}
}

// test model is as simple as a text that gets other string concatenated to it
Expand Down Expand Up @@ -282,6 +297,15 @@ class DynamoDBTimestampOffsetProjectionSpec
def createSourceProvider(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
complete: Boolean = true): TestTimestampSourceProvider = {
createSourceProviderWithMoreEnvelopes(envelopes, envelopes, enableCurrentEventsByPersistenceId = false, complete)
}

// envelopes are emitted by the "query" source, but allEnvelopes can be loaded
def createSourceProviderWithMoreEnvelopes(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
allEnvelopes: immutable.IndexedSeq[EventEnvelope[String]],
enableCurrentEventsByPersistenceId: Boolean,
complete: Boolean = true): TestTimestampSourceProvider = {
val sp =
TestSourceProvider[Offset, EventEnvelope[String]](Source(envelopes), _.offset)
.withStartSourceFrom {
Expand All @@ -294,7 +318,11 @@ class DynamoDBTimestampOffsetProjectionSpec
}
.withAllowCompletion(complete)

new TestTimestampSourceProvider(envelopes, sp, persistenceExt.numberOfSlices - 1)
new TestTimestampSourceProvider(
allEnvelopes,
sp,
persistenceExt.numberOfSlices - 1,
enableCurrentEventsByPersistenceId)
}

def createBacktrackingSourceProvider(
Expand All @@ -304,7 +332,11 @@ class DynamoDBTimestampOffsetProjectionSpec
TestSourceProvider[Offset, EventEnvelope[String]](Source(envelopes), _.offset)
.withStartSourceFrom { (_, _) => false } // include all
.withAllowCompletion(complete)
new TestTimestampSourceProvider(envelopes, sp, persistenceExt.numberOfSlices - 1)
new TestTimestampSourceProvider(
envelopes,
sp,
persistenceExt.numberOfSlices - 1,
enableCurrentEventsByPersistenceId = false)
}

private def latestOffsetShouldBe(expected: Any)(implicit offsetStore: DynamoDBOffsetStore) = {
Expand Down Expand Up @@ -857,6 +889,37 @@ class DynamoDBTimestampOffsetProjectionSpec
latestOffsetShouldBe(envelopes.last.offset)
}
}

"replay rejected sequence numbers" in {
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3)
val envelopes = allEnvelopes.filterNot { env =>
(env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5)) ||
(env.persistenceId == pid2 && (env.sequenceNr == 1))
}

val sourceProvider =
createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)
implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val projectionRef = spawn(
ProjectionBehavior(DynamoDBProjection
.atLeastOnce(projectionId, Some(settings), sourceProvider, handler = () => new ConcatHandler(repository))))

eventually {
projectedValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1)
projectedValueShouldBe("e1|e2|e3")(pid2)
}

eventually {
latestOffsetShouldBe(allEnvelopes.last.offset)
}
projectionRef ! ProjectionBehavior.Stop
}
}

"A DynamoDB exactly-once projection with TimestampOffset" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import akka.persistence.query.typed.scaladsl.LoadEventQuery
import akka.projection.BySlicesSourceProvider
import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider
import akka.projection.scaladsl.SourceProvider
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source

class TestSourceProviderWithInput()(implicit val system: ActorSystem[_])
class TestSourceProviderWithInput(enableCurrentEventsByPersistenceId: Boolean)(implicit val system: ActorSystem[_])
extends SourceProvider[TimestampOffset, EventEnvelope[String]]
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
with LoadEventQuery
with LoadEventsByPersistenceIdSourceProvider[String] {

def this()(implicit system: ActorSystem[_]) = this(enableCurrentEventsByPersistenceId = false)

private implicit val ec: ExecutionContext = system.executionContext
private val persistenceExt = Persistence(system)
Expand Down Expand Up @@ -96,4 +100,22 @@ class TestSourceProviderWithInput()(implicit val system: ActorSystem[_])
s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found."))
}
}

override private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = {
if (enableCurrentEventsByPersistenceId)
Some(
Source(
envelopes
.iterator()
.asScala
.filter { env =>
env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr
}
.toVector))
else
None
}
}

0 comments on commit cb9063c

Please sign in to comment.