-
Notifications
You must be signed in to change notification settings - Fork 14.7k
KAFKA-19807: Add RPC-level integration tests for StreamsGroupHeartbeat [2/2] #20762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds comprehensive integration tests for the StreamsGroupHeartbeat RPC, validating group coordinator behavior across various scenarios including internal topic creation, dynamic configuration changes, member expiration/rejoining, and coordinator restarts.
Key changes:
- Added four new integration tests for different StreamsGroupHeartbeat scenarios
- Implemented helper methods for topology creation and task ID conversion
- Configured test-specific cluster properties for heartbeat and session timeouts
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lucliu1108 You need to add the right header to the file
|
@lucliu1108 You need to add the right header to the file (meant to comment in this PR) |
…uestTest.scala Co-authored-by: Copilot <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly looking good to me, thanks!
Just minor comments
| }, "Second member rebalance heartbeat did not succeed within the timeout period.") | ||
|
|
||
| // Verify initial state with no standby tasks | ||
| assertEquals(0, streamsGroupHeartbeatResponse1.data.standbyTasks().size(), "Member 1 should have no standby tasks initially") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation seems off
| TestUtils.waitUntilTrue(() => { | ||
| val expiredResponse = connectAndReceive[StreamsGroupHeartbeatResponse](expiredHeartbeatRequest) | ||
| expiredResponse.data.errorCode == Errors.UNKNOWN_MEMBER_ID.code() && | ||
| expiredResponse.data.memberEpoch() == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there is a && missing here. Part of the boolean condition is just dropped.
|
|
||
| // Send heartbeat with topology containing internal topics | ||
| var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null | ||
| TestUtils.waitUntilTrue(() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the base class, there is a helper method streamsGroupHeartbeat, I think you may have defined it yourself. Couldn't we reuse it here?
| }, "Second member heartbeat after config change did not succeed within the timeout period.") | ||
|
|
||
| // Verify that at least one member has active tasks | ||
| val member1HasActive = streamsGroupHeartbeatResponse1.data.activeTasks().size() > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: member1HasActiveTasks
| val member1HasStandby = streamsGroupHeartbeatResponse1.data.standbyTasks().size() > 0 | ||
| val member2HasStandby = streamsGroupHeartbeatResponse2.data.standbyTasks().size() > 0 | ||
| assertTrue(member1HasStandby || member2HasStandby, "At least one member should have standby tasks after config change") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could add this to be more precise:
// With 2 members and streams.num.standby.replicas=1, each active task should have 1 standby
val totalActiveTasks = member1HasActive.size + member2HasActive.size
val totalStandbyTasks = member1HasStandby.size + member2HasStandby.size
assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task should have one standby")
What
Ticket: https://issues.apache.org/jira/browse/KAFKA-19807
Follow up on #20757, add tests that:
(
streams.num.standby.replicas) changeReviewers: @lucasbru