Skip to content

Commit

Permalink
ensure check of offset repo is wrapped with 'eventually' (#647)
Browse files Browse the repository at this point in the history
  • Loading branch information
octonato authored Aug 23, 2022
1 parent 95dd5a3 commit 47e79ad
Showing 1 changed file with 39 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ class CassandraProjectionSpec

private def concatHandlerFail4(): ConcatHandlerFail4 = new ConcatHandlerFail4

def offsetShouldBe(projectionId: ProjectionId, expectedOffset: Long) = {
eventually {
val currentOffset = offsetStore.readOffset[Long](projectionId).futureValue
currentOffset shouldBe Some(expectedOffset)
}
}

"A Cassandra at-least-once projection" must {

"persist projection and offset" in {
Expand All @@ -228,10 +235,7 @@ class CassandraProjectionSpec
concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr"
}
}
withClue("check - all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(6L)
}
offsetShouldBe(projectionId, 6L)
}

"restart from previous offset - handler throwing an exception, save after 1" in {
Expand All @@ -254,18 +258,14 @@ class CassandraProjectionSpec
eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg)
}
}
eventually {
withClue("check: projection is consumed up to third") {
val concatStr = repository.findById(entityId).futureValue
concatStr should matchPattern {
case Some(ConcatStr(_, "abc|def|ghi")) =>
}

offsetShouldBe(projectionId, 3L)
withClue("check: projection is consumed up to third") {
val concatStr = repository.findById(entityId).futureValue
concatStr should matchPattern {
case Some(ConcatStr(_, "abc|def|ghi")) =>
}
}
withClue("check: last seen offset is 3L") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(3L)
}

// re-run projection without failing function
val projection =
Expand All @@ -279,11 +279,7 @@ class CassandraProjectionSpec
concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr"
}
}

withClue("check: all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(6L)
}
offsetShouldBe(projectionId, 6L)
}

"restart from previous offset - handler throwing an exception, save after 2" in {
Expand Down Expand Up @@ -316,10 +312,7 @@ class CassandraProjectionSpec
}
}

withClue("check: last seen offset is 2L") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(2L)
}
offsetShouldBe(projectionId, 2L)

// re-run projection without failing function
val projection =
Expand All @@ -335,10 +328,7 @@ class CassandraProjectionSpec
}
}

withClue("check: all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(6L)
}
offsetShouldBe(projectionId, 6L)
}

"save offset after number of envelopes" in {
Expand Down Expand Up @@ -366,18 +356,14 @@ class CassandraProjectionSpec
(1 to 15).foreach { n =>
sourceProbe.get.sendNext(Envelope(entityId, n, s"elem-$n"))
}
eventually {
repository.findById(entityId).futureValue.get.text should include("elem-15")
}
offsetStore.readOffset[Long](projectionId).futureValue shouldBe Some(10L)
offsetShouldBe(projectionId, 10L)
repository.findById(entityId).futureValue.get.text should include("elem-15")

(16 to 22).foreach { n =>
sourceProbe.get.sendNext(Envelope(entityId, n, s"elem-$n"))
}
eventually {
repository.findById(entityId).futureValue.get.text should include("elem-22")
}
offsetStore.readOffset[Long](projectionId).futureValue shouldBe Some(20L)
offsetShouldBe(projectionId, 20L)
repository.findById(entityId).futureValue.get.text should include("elem-22")
}
}

Expand Down Expand Up @@ -407,17 +393,13 @@ class CassandraProjectionSpec
(1 to 15).foreach { n =>
sourceProbe.get.sendNext(Envelope(entityId, n, s"elem-$n"))
}
eventually {
repository.findById(entityId).futureValue.get.text should include("elem-15")
}
offsetStore.readOffset[Long](projectionId).futureValue shouldBe Some(10L)
offsetShouldBe(projectionId, 10L)
repository.findById(entityId).futureValue.get.text should include("elem-15")

(16 to 17).foreach { n =>
sourceProbe.get.sendNext(Envelope(entityId, n, s"elem-$n"))
}
eventually {
offsetStore.readOffset[Long](projectionId).futureValue shouldBe Some(17L)
}
offsetShouldBe(projectionId, 17L)
repository.findById(entityId).futureValue.get.text should include("elem-17")

}
Expand All @@ -441,10 +423,7 @@ class CassandraProjectionSpec
}
}

withClue("check: all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(6L)
}
offsetShouldBe(projectionId, 6L)
}

"skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in {
Expand Down Expand Up @@ -619,10 +598,8 @@ class CassandraProjectionSpec
concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr"
}
}
withClue("check - all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(6L)
}

offsetShouldBe(projectionId, 6L)
}
}

Expand All @@ -649,10 +626,8 @@ class CassandraProjectionSpec
concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr"
}
}
withClue("check - all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(6L)
}

offsetShouldBe(projectionId, 6L)
}
}

Expand Down Expand Up @@ -707,10 +682,7 @@ class CassandraProjectionSpec

stopProbe.receiveMessage()

withClue("check - all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(6L)
}
offsetShouldBe(projectionId, 6L)
}
}

Expand All @@ -735,10 +707,8 @@ class CassandraProjectionSpec
concatStr.text shouldBe "abc|def|ghi|jkl|mno|pqr"
}
}
withClue("check - all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(6L)
}

offsetShouldBe(projectionId, 6L)
}

"restart from next offset - handler throwing an exception" in {
Expand All @@ -760,18 +730,14 @@ class CassandraProjectionSpec
eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg)
}
}
eventually {
withClue("check: projection is consumed up to third") {
val concatStr = repository.findById(entityId).futureValue
concatStr should matchPattern {
case Some(ConcatStr(_, "abc|def|ghi")) =>
}

offsetShouldBe(projectionId, 4L)
withClue("check: projection is consumed up to third") {
val concatStr = repository.findById(entityId).futureValue
concatStr should matchPattern {
case Some(ConcatStr(_, "abc|def|ghi")) =>
}
}
withClue("check: last seen offset is 4L") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(4L) // offset saved before handler for at-most-once
}

// re-run projection without failing function
val projection =
Expand All @@ -786,10 +752,7 @@ class CassandraProjectionSpec
}
}

withClue("check: all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue
offset shouldBe Some(6L)
}
offsetShouldBe(projectionId, 6L)
}
}

Expand Down

0 comments on commit 47e79ad

Please sign in to comment.