-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19779: Add per-partition epoch validation to streams groups [4/N] #20760
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| return (topicName, topicId, partitionId) -> { | ||
| final StreamsGroupTopologyValue.Subtopology subtopology = streamsTopology.sourceTopicMap().get(topicName); | ||
| if (subtopology == null) { | ||
| throw new StaleMemberEpochException("Topic " + topicName + " is not in the topology."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case is actually impossible right now because we do not allow updating the topology yet. But I think this would be the correct behavior once we allow changing the topology: We are trying to commit for a subtopology that does not exist anymore, so we should fence the member.
| ) { | ||
| // Retrieve topology once for all partitions - not per partition! | ||
| final StreamsTopology streamsTopology = topology.get().orElseThrow(() -> | ||
| new StaleMemberEpochException("Topology is not available for offset commit validation.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not allow removing the topology, so I think this may almost impossible. We'd have to recreate the group of the same name, and get the same member ID back to reach this point. If that would ever happen, I think fencing the member would be okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances offset commit validation in streams groups by implementing per-partition epoch validation. Instead of just checking the member epoch, the system now validates that each partition being committed has an assignment epoch that is not newer than the commit request's epoch, enabling proper fencing of zombie commit requests.
Key changes:
- Introduction of
TasksTupleWithEpochsto track assignment epochs per partition for active tasks - Addition of
createAssignmentEpochValidatormethod for partition-level validation - Extension of
SmokeTestDriverIntegrationTestto detect excessive record processing - Re-enabling of previously flaky
EosIntegrationTest
Reviewed Changes
Copilot reviewed 23 out of 24 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
TasksTupleWithEpochs.java |
New class storing active tasks with per-partition assignment epochs |
TasksTuple.java |
Updated to work with TasksTupleWithEpochs for containment checks |
StreamsGroup.java |
Implements per-partition epoch validation via createAssignmentEpochValidator |
StreamsGroupMember.java |
Changed to use TasksTupleWithEpochs for assigned tasks |
CurrentAssignmentBuilder.java |
Updated to preserve and assign epochs when building assignments |
StreamsCoordinatorRecordHelpers.java |
Serialization support for assignment epochs |
StreamsGroupCurrentMemberAssignmentValue.json |
Added AssignmentEpochs field to schema |
SmokeTestClient.java |
Tracks total data records processed |
SmokeTestDriverIntegrationTest.java |
Validates no excessive record reprocessing occurs |
EosIntegrationTest.java |
Removed @Flaky annotation from test |
| Various test files | Updated to work with epoch-aware task assignments |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
...ordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
Show resolved
Hide resolved
...tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
Outdated
Show resolved
Hide resolved
| group.updateMember(new StreamsGroupMember.Builder("member-1") | ||
| .setMemberEpoch(5) | ||
| .setAssignedTasks(new TasksTupleWithEpochs( | ||
| Map.of("0", Map.of(0, 2, 1, 2)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem we only test epoch quality? Should we use two different epochs for each partition to extend test coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I made them different
| .build()); | ||
|
|
||
| CommitPartitionValidator validator = group.validateOffsetCommit( | ||
| "member-1", "", 2, false, ApiKeys.OFFSET_COMMIT.latestVersion()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we set member epoch to 5 above, should we also pass in 5 in here for the commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, with the relaxed check, it should just be smaller I guess, but larger or equals than the assignment epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly.
| "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion()); | ||
|
|
||
| // Partition 0 assigned with epoch 2, received epoch 1 should throw | ||
| assertThrows(StaleMemberEpochException.class, () -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem this check is somewhat redundant to testValidateOffsetCommitWithOlderEpoch from above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. This is not really what we are testing in this test. Removed this assertion
| Map.of(), Map.of())) | ||
| .build()); | ||
|
|
||
| // Commit with member epoch 3 should fail (3 < assignment epochs 5 and 8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should even fail for epoch 7 (< 8) right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. It's a better test if only one partition fails the check.
| } | ||
|
|
||
| @Test | ||
| public void testStreamsGroupOffsetCommitWithOlderMemberEpochValidAssignments() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering to what extend this test, and the above testStreamsGroupOffsetCommitWithAssignmentEpochValid contain each other? For both cases, we expect success, what means member epoch > every assignment epoch.
What increase test coverage to we get by having both tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Removed this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock overall GTM.
|
@mjsax Comments addressed and PR rebased |
fe84e73 to
f270733
Compare
This commit enhances the offset commit validation logic in streams groups to validate against per-partition assignment epochs. When a member attempts to commit offsets with an older member epoch, the logic now validates that the epoch is not older than the assignment epoch for each individual partition being committed. The implementation adds a new `createAssignmentEpochValidator` method that creates partition-level validators, checking each partition against its assignment epoch from either assigned tasks or tasks pending revocation. We extend the SmokeTestDriverIntegrationTest to detect if we have processed more records than needed, which, in this restricted scenario, should only happen when offset commits are failing. We re-enable the previously flaky test in EosIntegrationTest, which failed due to previously failing offset commits. Both tests have been run 100x in there streams protocol variation to validate that they are not flaky anymore.
169e5e4 to
3444f13
Compare
…onTest.shouldNotViolateEosIfOneTaskFails` as flaky (apache#20733)" This reverts commit 8dfaedd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
This change enhances the offset commit validation logic in streams
groups to validate against per-partition assignment epochs. When a
member attempts to commit offsets with an older member epoch, the logic
now validates that the epoch is not older than the assignment epoch for
each individual partition being committed.
The implementation adds a new
createAssignmentEpochValidatormethodthat creates partition-level validators, checking each partition against
its assignment epoch from either assigned tasks or tasks pending
revocation.
We extend the SmokeTestDriverIntegrationTest to detect if we have
processed more records than needed, which, in this restricted scenario,
should only happen when offset commits are failing.
We re-enable the previously flaky test in EosIntegrationTest, which
failed due to previously failing offset commits.
Both tests have been run 100x in their streams protocol variation to
validate that they are not flaky anymore.
Reviewers: David Jacot [email protected], Matthias J. Sax
[email protected]