Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2900,7 +2900,6 @@ project(':streams:integration-tests') {
testImplementation libs.mockitoCore
testImplementation testLog4j2Libs
testImplementation project(':streams:test-utils')
testImplementation project(':test-common:test-common-util')

testRuntimeOnly runtimeTestLibs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
Expand Down Expand Up @@ -726,8 +727,17 @@ public CommitPartitionValidator validateOffsetCommit(
"by members using the streams group protocol");
}

validateMemberEpoch(memberEpoch, member.memberEpoch());
return CommitPartitionValidator.NO_OP;
if (memberEpoch == member.memberEpoch()) {
return CommitPartitionValidator.NO_OP;
}

if (memberEpoch > member.memberEpoch()) {
throw new StaleMemberEpochException(String.format("Received member epoch %d is newer than " +
"current member epoch %d.", memberEpoch, member.memberEpoch()));
}

// Member epoch is older; validate against per-partition assignment epochs.
return createAssignmentEpochValidator(member, memberEpoch);
}

/**
Expand Down Expand Up @@ -1120,4 +1130,54 @@ public void setLastAssignmentConfigs(Map<String, String> lastAssignmentConfigs)
this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
}
}

/**
* Creates a validator that checks if the received member epoch is valid for each partition's assignment epoch.
*
* @param member The member whose assignments are being validated.
* @param receivedMemberEpoch The received member epoch.
* @return A validator for per-partition validation.
*/
private CommitPartitionValidator createAssignmentEpochValidator(
final StreamsGroupMember member,
int receivedMemberEpoch
) {
// Retrieve topology once for all partitions - not per partition!
final StreamsTopology streamsTopology = topology.get().orElseThrow(() ->
new StaleMemberEpochException("Topology is not available for offset commit validation."));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not allow removing the topology, so I think this may almost impossible. We'd have to recreate the group of the same name, and get the same member ID back to reach this point. If that would ever happen, I think fencing the member would be okay.


final TasksTupleWithEpochs assignedTasks = member.assignedTasks();
final TasksTupleWithEpochs tasksPendingRevocation = member.tasksPendingRevocation();

return (topicName, topicId, partitionId) -> {
final StreamsGroupTopologyValue.Subtopology subtopology = streamsTopology.sourceTopicMap().get(topicName);
if (subtopology == null) {
throw new StaleMemberEpochException("Topic " + topicName + " is not in the topology.");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is actually impossible right now because we do not allow updating the topology yet. But I think this would be the correct behavior once we allow changing the topology: We are trying to commit for a subtopology that does not exist anymore, so we should fence the member.

}

final String subtopologyId = subtopology.subtopologyId();

// Search for the partition in assigned tasks, then in tasks pending revocation
Integer assignmentEpoch = assignedTasks.activeTasksWithEpochs()
.getOrDefault(subtopologyId, Collections.emptyMap())
.get(partitionId);
if (assignmentEpoch == null) {
assignmentEpoch = tasksPendingRevocation.activeTasksWithEpochs()
.getOrDefault(subtopologyId, Collections.emptyMap())
.get(partitionId);
}

if (assignmentEpoch == null) {
throw new StaleMemberEpochException(String.format(
"Task %s-%d is not assigned or pending revocation for member.",
subtopologyId, partitionId));
}

if (receivedMemberEpoch < assignmentEpoch) {
throw new StaleMemberEpochException(String.format(
"Received member epoch %d is older than assignment epoch %d for task %s-%d.",
receivedMemberEpoch, assignmentEpoch, subtopologyId, partitionId));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.TasksTupleWithEpochs;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
Expand Down Expand Up @@ -3659,4 +3660,77 @@ private ClassicGroupMember mkGenericMember(
)
);
}

@Test
public void testStreamsGroupOffsetCommitWithAssignmentEpochValid() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
StreamsGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);

// Setup: topology with topic "bar" in subtopology "0"
group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("0")
.setSourceTopics(List.of("bar")))));

// Member at epoch 10, with partitions assigned at epoch 4 and 5 respsectively.
group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
.setMemberEpoch(10)
.setAssignedTasks(new TasksTupleWithEpochs(
Map.of("0", Map.of(0, 4, 1, 5)),
Map.of(), Map.of()))
.build());

// Commit with member epoch 5 should succeed (5 >= assignment epoch 5)
CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(
new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(5)
.setTopics(List.of(new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
.setPartitions(List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L),
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(1)
.setCommittedOffset(200L))))));

assertEquals(Errors.NONE.code(), result.response().topics().get(0).partitions().get(0).errorCode());
assertEquals(Errors.NONE.code(), result.response().topics().get(0).partitions().get(1).errorCode());
assertEquals(2, result.records().size());
}

@Test
public void testStreamsGroupOffsetCommitWithAssignmentEpochStale() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
StreamsGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);

group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("0")
.setSourceTopics(List.of("bar")))));

// Member at epoch 10, with partitions assigned at different epochs
group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
.setMemberEpoch(10)
.setAssignedTasks(new TasksTupleWithEpochs(
Map.of("0", Map.of(0, 5, 1, 8)),
Map.of(), Map.of()))
.build());

// Commit with member epoch 7 should fail (3 < assignment epochs 8)
assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(
new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(3)
.setTopics(List.of(new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
.setPartitions(List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L),
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(1)
.setCommittedOffset(200L)))))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
Expand Down Expand Up @@ -660,7 +661,7 @@ public void testValidateOffsetCommit(short version) {
assertThrows(UnknownMemberIdException.class, () ->
group.validateOffsetCommit("", null, -1, isTransactional, version));

// The member epoch is stale.
// The member epoch is stale (newer than current).
if (version >= 9) {
assertThrows(StaleMemberEpochException.class, () ->
group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version));
Expand All @@ -669,7 +670,7 @@ public void testValidateOffsetCommit(short version) {
group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version));
}

// This should succeed.
// This should succeed (matching member epoch).
if (version >= 9) {
group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version);
} else {
Expand All @@ -678,6 +679,108 @@ public void testValidateOffsetCommit(short version) {
}
}

@Test
public void testValidateOffsetCommitWithOlderEpoch() {
StreamsGroup group = createStreamsGroup("group-foo");

group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("0")
.setSourceTopics(List.of("input-topic")))));

group.updateMember(new StreamsGroupMember.Builder("member-1")
.setMemberEpoch(2)
.setAssignedTasks(new TasksTupleWithEpochs(
Map.of("0", Map.of(0, 2, 1, 1)),
Map.of(), Map.of()))
.build());

CommitPartitionValidator validator = group.validateOffsetCommit(
"member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());

// Received epoch (1) < assignment epoch (2) should throw
assertThrows(StaleMemberEpochException.class, () ->
validator.validate("input-topic", Uuid.ZERO_UUID, 0));
}

@Test
public void testValidateOffsetCommitWithOlderEpochMissingTopology() {
StreamsGroup group = createStreamsGroup("group-foo");

group.updateMember(new StreamsGroupMember.Builder("member-1")
.setMemberEpoch(2)
.build());

// Topology is retrieved when creating validator, so exception is thrown here
assertThrows(StaleMemberEpochException.class, () ->
group.validateOffsetCommit("member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion()));
}

@Test
public void testValidateOffsetCommitWithOlderEpochMissingSubtopology() {
StreamsGroup group = createStreamsGroup("group-foo");

group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("0")
.setSourceTopics(List.of("input-topic")))));

group.updateMember(new StreamsGroupMember.Builder("member-1")
.setMemberEpoch(2)
.build());

CommitPartitionValidator validator = group.validateOffsetCommit(
"member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());

assertThrows(StaleMemberEpochException.class, () ->
validator.validate("unknown-topic", Uuid.ZERO_UUID, 0));
}

@Test
public void testValidateOffsetCommitWithOlderEpochUnassignedPartition() {
StreamsGroup group = createStreamsGroup("group-foo");

group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("0")
.setSourceTopics(List.of("input-topic")))));

group.updateMember(new StreamsGroupMember.Builder("member-1")
.setMemberEpoch(2)
.setAssignedTasks(new TasksTupleWithEpochs(
Map.of("0", Map.of(0, 1)),
Map.of(), Map.of()))
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
.build());

CommitPartitionValidator validator = group.validateOffsetCommit(
"member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());

// Partition 1 not assigned should throw
assertThrows(StaleMemberEpochException.class, () ->
validator.validate("input-topic", Uuid.ZERO_UUID, 1));
}

@Test
public void testValidateOffsetCommitWithOlderEpochValidAssignment() {
StreamsGroup group = createStreamsGroup("group-foo");

group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("0")
.setSourceTopics(List.of("input-topic")))));

group.updateMember(new StreamsGroupMember.Builder("member-1")
.setMemberEpoch(5)
.setAssignedTasks(new TasksTupleWithEpochs(
Map.of("0", Map.of(0, 2, 1, 2)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem we only test epoch quality? Should we use two different epochs for each partition to extend test coverage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I made them different

Map.of(), Map.of()))
.build());

CommitPartitionValidator validator = group.validateOffsetCommit(
"member-1", "", 2, false, ApiKeys.OFFSET_COMMIT.latestVersion());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we set member epoch to 5 above, should we also pass in 5 in here for the commit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, with the relaxed check, it should just be smaller I guess, but larger or equals than the assignment epoch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly.


// Received epoch 2 == assignment epoch 2 should succeed
validator.validate("input-topic", Uuid.ZERO_UUID, 0);
validator.validate("input-topic", Uuid.ZERO_UUID, 1);
}

@Test
public void testAsListedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -396,7 +395,6 @@ public void shouldBeAbleToPerformMultipleTransactions(final String groupProtocol
}
}

@Flaky("KAFKA-19816")
@ParameterizedTest
@MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -140,11 +141,14 @@ public void shouldWorkWithRebalance(
throw new AssertionError("Test called halt(). code:" + statusCode + " message:" + message);
});
int numClientsCreated = 0;
int numDataRecordsProcessed = 0;
final int numKeys = 10;
final int maxRecordsPerKey = 1000;

IntegrationTestUtils.cleanStateBeforeTest(cluster, SmokeTestDriver.topics());

final String bootstrapServers = cluster.bootstrapServers();
final Driver driver = new Driver(bootstrapServers, 10, 1000);
final Driver driver = new Driver(bootstrapServers, numKeys, maxRecordsPerKey);
driver.start();
System.out.println("started driver");

Expand Down Expand Up @@ -183,6 +187,7 @@ public void shouldWorkWithRebalance(
assertFalse(client.error(), "The streams application seems to have crashed.");
Thread.sleep(100);
}
numDataRecordsProcessed += client.totalDataRecordsProcessed();
}
}

Expand All @@ -201,6 +206,7 @@ public void shouldWorkWithRebalance(
assertFalse(client.error(), "The streams application seems to have crashed.");
Thread.sleep(100);
}
numDataRecordsProcessed += client.totalDataRecordsProcessed();
}
}

Expand All @@ -210,5 +216,16 @@ public void shouldWorkWithRebalance(
throw new AssertionError(driver.exception());
}
assertTrue(driver.result().passed(), driver.result().result());

// The one extra record is a record that the driver produces to flush suppress
final int expectedRecords = numKeys * maxRecordsPerKey + 1;

// We check that we did no have to reprocess any records, which would indicate a bug since everything
// runs locally in this test.
assertEquals(expectedRecords, numDataRecordsProcessed,
String.format("It seems we had to reprocess records, expected %d records, processed %d records.",
expectedRecords,
numDataRecordsProcessed)
);
}
}
Loading