diff --git a/akka-projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraProjectionSpec.scala b/akka-projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraProjectionSpec.scala index a9b59ef60..3d51d6ae5 100644 --- a/akka-projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraProjectionSpec.scala +++ b/akka-projection-cassandra/src/it/scala/akka/projection/cassandra/CassandraProjectionSpec.scala @@ -206,6 +206,13 @@ class CassandraProjectionSpec private def concatHandlerFail4(): ConcatHandlerFail4 = new ConcatHandlerFail4 + def offsetShouldBe(projectionId: ProjectionId, expectedOffset: Long) = { + eventually { + val currentOffset = offsetStore.readOffset[Long](projectionId).futureValue + currentOffset shouldBe Some(expectedOffset) + } + } + "A Cassandra at-least-once projection" must { "persist projection and offset" in { @@ -228,10 +235,7 @@ class CassandraProjectionSpec concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr" } } - withClue("check - all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(6L) - } + offsetShouldBe(projectionId, 6L) } "restart from previous offset - handler throwing an exception, save after 1" in { @@ -254,18 +258,14 @@ class CassandraProjectionSpec eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg) } } - eventually { - withClue("check: projection is consumed up to third") { - val concatStr = repository.findById(entityId).futureValue - concatStr should matchPattern { - case Some(ConcatStr(_, "abc|def|ghi")) => - } + + offsetShouldBe(projectionId, 3L) + withClue("check: projection is consumed up to third") { + val concatStr = repository.findById(entityId).futureValue + concatStr should matchPattern { + case Some(ConcatStr(_, "abc|def|ghi")) => } } - withClue("check: last seen offset is 3L") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(3L) - } // re-run projection without failing function val projection = @@ -279,11 +279,7 @@ class CassandraProjectionSpec concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr" } } - - withClue("check: all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(6L) - } + offsetShouldBe(projectionId, 6L) } "restart from previous offset - handler throwing an exception, save after 2" in { @@ -316,10 +312,7 @@ class CassandraProjectionSpec } } - withClue("check: last seen offset is 2L") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(2L) - } + offsetShouldBe(projectionId, 2L) // re-run projection without failing function val projection = @@ -335,10 +328,7 @@ class CassandraProjectionSpec } } - withClue("check: all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(6L) - } + offsetShouldBe(projectionId, 6L) } "save offset after number of envelopes" in { @@ -366,18 +356,14 @@ class CassandraProjectionSpec (1 to 15).foreach { n => sourceProbe.get.sendNext(Envelope(entityId, n, s"elem-$n")) } - eventually { - repository.findById(entityId).futureValue.get.text should include("elem-15") - } - offsetStore.readOffset[Long](projectionId).futureValue shouldBe Some(10L) + offsetShouldBe(projectionId, 10L) + repository.findById(entityId).futureValue.get.text should include("elem-15") (16 to 22).foreach { n => sourceProbe.get.sendNext(Envelope(entityId, n, s"elem-$n")) } - eventually { - repository.findById(entityId).futureValue.get.text should include("elem-22") - } - offsetStore.readOffset[Long](projectionId).futureValue shouldBe Some(20L) + offsetShouldBe(projectionId, 20L) + repository.findById(entityId).futureValue.get.text should include("elem-22") } } @@ -407,17 +393,13 @@ class CassandraProjectionSpec (1 to 15).foreach { n => sourceProbe.get.sendNext(Envelope(entityId, n, s"elem-$n")) } - eventually { - repository.findById(entityId).futureValue.get.text should include("elem-15") - } - offsetStore.readOffset[Long](projectionId).futureValue shouldBe Some(10L) + offsetShouldBe(projectionId, 10L) + repository.findById(entityId).futureValue.get.text should include("elem-15") (16 to 17).foreach { n => sourceProbe.get.sendNext(Envelope(entityId, n, s"elem-$n")) } - eventually { - offsetStore.readOffset[Long](projectionId).futureValue shouldBe Some(17L) - } + offsetShouldBe(projectionId, 17L) repository.findById(entityId).futureValue.get.text should include("elem-17") } @@ -441,10 +423,7 @@ class CassandraProjectionSpec } } - withClue("check: all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(6L) - } + offsetShouldBe(projectionId, 6L) } "skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in { @@ -619,10 +598,8 @@ class CassandraProjectionSpec concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr" } } - withClue("check - all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(6L) - } + + offsetShouldBe(projectionId, 6L) } } @@ -649,10 +626,8 @@ class CassandraProjectionSpec concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr" } } - withClue("check - all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(6L) - } + + offsetShouldBe(projectionId, 6L) } } @@ -707,10 +682,7 @@ class CassandraProjectionSpec stopProbe.receiveMessage() - withClue("check - all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(6L) - } + offsetShouldBe(projectionId, 6L) } } @@ -735,10 +707,8 @@ class CassandraProjectionSpec concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr" } } - withClue("check - all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(6L) - } + + offsetShouldBe(projectionId, 6L) } "restart from next offset - handler throwing an exception" in { @@ -760,18 +730,14 @@ class CassandraProjectionSpec eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg) } } - eventually { - withClue("check: projection is consumed up to third") { - val concatStr = repository.findById(entityId).futureValue - concatStr should matchPattern { - case Some(ConcatStr(_, "abc|def|ghi")) => - } + + offsetShouldBe(projectionId, 4L) + withClue("check: projection is consumed up to third") { + val concatStr = repository.findById(entityId).futureValue + concatStr should matchPattern { + case Some(ConcatStr(_, "abc|def|ghi")) => } } - withClue("check: last seen offset is 4L") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(4L) // offset saved before handler for at-most-once - } // re-run projection without failing function val projection = @@ -786,10 +752,7 @@ class CassandraProjectionSpec } } - withClue("check: all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue - offset shouldBe Some(6L) - } + offsetShouldBe(projectionId, 6L) } }