Skip to content

Commit 3f75f45

Browse files
authored
KAFKA-19481: Fix flaky test testConsumerGroupHeartbeatWithRegex (apache#20298)
### Description This change fixes the flaky test testConsumerGroupHeartbeatWithRegex, which fails with the following log. [Develocity link](https://develocity.apache.org/scans/tests?search.timeZoneId=America%2FChicago&tests.container=kafka.api.AuthorizerIntegrationTest&tests.test=testConsumerGroupHeartbeatWithRegex()) `org.opentest4j.AssertionFailedError: Unexpected assignment ConsumerGroupHeartbeatResponseData(throttleTimeMs=0, errorCode=0, errorMessage=null, memberId='OGfeiEjOQbqUTsJgtGMCdQ', memberEpoch=1, heartbeatIntervalMs=5000, assignment=null) ==> expected: not <null>` I addressed the issue by using `TestUtils.tryUntilNoAssertionError()` to allow for retries. **Root Cause:** The failure occurs because the test depends on an async operation, `refreshRegularExpressions`, within [`GroupMetadataManager`](https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L3184-L3187) , which may not complete running before assertion runs. * Also 1 indicator async operation did not finish is the log `memberEpoch=1` because when this test successful runs `memberEpoch=2` at end. Initially updated from epoch 0 -> 1 [here](https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L2240). Once `refreshRegularExpressions` is done, `handleRegularExpressionsResult` updates epoch from 1 -> 2 [here](https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L3385) `refreshRegularExpressions` is responsible for resolving regular expression based subscriptions to the current set of matching topic names in the cluster. ### Testing - Used this command ``` for i in {1..100}; do echo "Run #$i"; ./gradlew :core:integrationTest --rerun-tasks --tests kafka.api.AuthorizerIntegrationTest.testConsumerGroupHeartbeatWithRegex; if [ $? -ne 0 ]; then echo "Test failed on run #$i"; exit 1; fi; done; echo "All 100 runs passed successfully." ``` - Also added intentional wait to async call and confirmed it passed **Note:** I ran this 100 times locally, but was unable to reproduce the same error. Only way I was able to reproduce was by adding a sleep of a second to async call then got the same exact error. Reviewers: Sean Quah <[email protected]>, PoAn Yang <[email protected]>
1 parent ffab7e6 commit 3f75f45

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3123,8 +3123,10 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
31233123
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
31243124
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
31253125

3126-
val response = sendAndReceiveFirstRegexHeartbeat(Uuid.randomUuid.toString, listenerName)
3127-
sendAndReceiveRegexHeartbeat(response, listenerName, Some(1))
3126+
var response = sendAndReceiveFirstRegexHeartbeat(Uuid.randomUuid.toString, listenerName)
3127+
TestUtils.tryUntilNoAssertionError() {
3128+
response = sendAndReceiveRegexHeartbeat(response, listenerName, Some(1))
3129+
}
31283130
}
31293131

31303132
@Test

0 commit comments

Comments
 (0)