Skip to content

Commit

Permalink
test: Harden JdbcProjectionTest atLeastOnceShouldRestartFromPreviousO…
Browse files Browse the repository at this point in the history
…ffset (#1084)

* awaitAssert, trying to make it more simular to R2dbcProjectionSpec and JdbcProjectionSpec
  • Loading branch information
patriknw authored Dec 6, 2023
1 parent 7680fe1 commit e841c10
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,9 @@ class JdbcProjectionSpec
projectionTestKit.run(projection) {
result.toString shouldBe "e1|e2|e3|e4|e5|e6|"
}
offsetShouldBe(6L)
eventually {
offsetShouldBe(6L)
}
}
}

Expand All @@ -693,7 +695,9 @@ class JdbcProjectionSpec
projectionTestKit.run(projection) {
projectedValueShouldBe("e1|e2|e3|e4|e5|e6")
}
offsetShouldBe(6L)
eventually {
offsetShouldBe(6L)
}
}

"skip failing events when using RecoveryStrategy.skip, save after 1" in {
Expand All @@ -714,7 +718,9 @@ class JdbcProjectionSpec
projectionTestKit.run(projection) {
projectedValueShouldBe("e1|e2|e3|e5|e6")
}
offsetShouldBe(6L)
eventually {
offsetShouldBe(6L)
}
}

"skip failing events when using RecoveryStrategy.skip, save after 2" in {
Expand All @@ -735,7 +741,9 @@ class JdbcProjectionSpec
projectionTestKit.run(projection) {
projectedValueShouldBe("e1|e2|e3|e5|e6")
}
offsetShouldBe(6L)
eventually {
offsetShouldBe(6L)
}
}

"restart from previous offset - handler throwing an exception, save after 1" in {
Expand Down Expand Up @@ -777,7 +785,9 @@ class JdbcProjectionSpec
projectedValueShouldBe("e1|e2|e3|e4|e5|e6")
}

offsetShouldBe(6L)
eventually {
offsetShouldBe(6L)
}
}

"restart from previous offset - handler throwing an exception, save after 2" in {
Expand Down Expand Up @@ -1032,7 +1042,9 @@ class JdbcProjectionSpec
projectionTestKit.run(projection) {
projectedValueShouldBe("e1|e2|e3|e4|e5|e6")
}
offsetShouldBe(6L)
eventually {
offsetShouldBe(6L)
}

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,16 +331,11 @@ public void atLeastOnceShouldRestartFromPreviousOffset() {
projectionTestKit.runWithTestSink(
projection,
(probe) -> {
/*
* We only want to process 3 elements through the handler, but given buffering within the projections
* at-least-once impl. we actually process +1 element than we requested with the TestSink probe.
*
* See https://github.com/akka/akka-projection/issues/462 for a possible solution.
*/
probe.request(2);
probe.expectNextN(2);
assertEquals("abc|def|ghi|", str.toString());
expectNextUntilErrorMessage(probe, failMessage(4));
probe.request(3);
testKit.createTestProbe().awaitAssert(() -> {
assertEquals("abc|def|ghi|", str.toString());
return null;
});
});

assertStoredOffset(projectionId, 3L);
Expand All @@ -360,6 +355,7 @@ public void atLeastOnceShouldRestartFromPreviousOffset() {
() -> {
assertEquals("abc|def|ghi|jkl|mno|pqr|", str.toString());
});
assertStoredOffset(projectionId, 6L);
}

@Test
Expand Down

0 comments on commit e841c10

Please sign in to comment.