diff --git a/build.gradle b/build.gradle index 6b7fefbf3339b..a1768cfac66dd 100644 --- a/build.gradle +++ b/build.gradle @@ -2900,7 +2900,6 @@ project(':streams:integration-tests') { testImplementation libs.mockitoCore testImplementation testLog4j2Libs testImplementation project(':streams:test-utils') - testImplementation project(':test-common:test-common-util') testRuntimeOnly runtimeTestLibs } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 30a5652a2ac62..192ddada4deae 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -32,6 +32,7 @@ import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.Utils; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; @@ -726,8 +727,17 @@ public CommitPartitionValidator validateOffsetCommit( "by members using the streams group protocol"); } - validateMemberEpoch(memberEpoch, member.memberEpoch()); - return CommitPartitionValidator.NO_OP; + if (memberEpoch == member.memberEpoch()) { + return CommitPartitionValidator.NO_OP; + } + + if (memberEpoch > member.memberEpoch()) { + throw new StaleMemberEpochException(String.format("Received member epoch %d is newer than " + + "current member epoch %d.", memberEpoch, member.memberEpoch())); + } + + // Member epoch is older; validate against per-partition assignment epochs. + return createAssignmentEpochValidator(member, memberEpoch); } /** @@ -1120,4 +1130,54 @@ public void setLastAssignmentConfigs(Map lastAssignmentConfigs) this.lastAssignmentConfigs.putAll(lastAssignmentConfigs); } } + + /** + * Creates a validator that checks if the received member epoch is valid for each partition's assignment epoch. + * + * @param member The member whose assignments are being validated. + * @param receivedMemberEpoch The received member epoch. + * @return A validator for per-partition validation. + */ + private CommitPartitionValidator createAssignmentEpochValidator( + final StreamsGroupMember member, + int receivedMemberEpoch + ) { + // Retrieve topology once for all partitions - not per partition! + final StreamsTopology streamsTopology = topology.get().orElseThrow(() -> + new StaleMemberEpochException("Topology is not available for offset commit validation.")); + + final TasksTupleWithEpochs assignedTasks = member.assignedTasks(); + final TasksTupleWithEpochs tasksPendingRevocation = member.tasksPendingRevocation(); + + return (topicName, topicId, partitionId) -> { + final StreamsGroupTopologyValue.Subtopology subtopology = streamsTopology.sourceTopicMap().get(topicName); + if (subtopology == null) { + throw new StaleMemberEpochException("Topic " + topicName + " is not in the topology."); + } + + final String subtopologyId = subtopology.subtopologyId(); + + // Search for the partition in assigned tasks, then in tasks pending revocation + Integer assignmentEpoch = assignedTasks.activeTasksWithEpochs() + .getOrDefault(subtopologyId, Collections.emptyMap()) + .get(partitionId); + if (assignmentEpoch == null) { + assignmentEpoch = tasksPendingRevocation.activeTasksWithEpochs() + .getOrDefault(subtopologyId, Collections.emptyMap()) + .get(partitionId); + } + + if (assignmentEpoch == null) { + throw new StaleMemberEpochException(String.format( + "Task %s-%d is not assigned or pending revocation for member.", + subtopologyId, partitionId)); + } + + if (receivedMemberEpoch < assignmentEpoch) { + throw new StaleMemberEpochException(String.format( + "Received member epoch %d is older than assignment epoch %d for task %s-%d.", + receivedMemberEpoch, assignmentEpoch, subtopologyId, partitionId)); + } + }; + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 672cc37c41411..275a3eeccc470 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -66,6 +66,7 @@ import org.apache.kafka.coordinator.group.streams.StreamsGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; import org.apache.kafka.coordinator.group.streams.StreamsTopology; +import org.apache.kafka.coordinator.group.streams.TasksTupleWithEpochs; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; @@ -3659,4 +3660,77 @@ private ClassicGroupMember mkGenericMember( ) ); } + + @Test + public void testStreamsGroupOffsetCommitWithAssignmentEpochValid() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + StreamsGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true); + + // Setup: topology with topic "bar" in subtopology "0" + group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("0") + .setSourceTopics(List.of("bar"))))); + + // Member at epoch 10, with partitions assigned at epoch 4 and 5 respsectively. + group.updateMember(StreamsGroupMember.Builder.withDefaults("member") + .setMemberEpoch(10) + .setAssignedTasks(new TasksTupleWithEpochs( + Map.of("0", Map.of(0, 4, 1, 5)), + Map.of(), Map.of())) + .build()); + + // Commit with member epoch 5 should succeed (5 >= assignment epoch 5) + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(5) + .setTopics(List.of(new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List.of( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(200L)))))); + + assertEquals(Errors.NONE.code(), result.response().topics().get(0).partitions().get(0).errorCode()); + assertEquals(Errors.NONE.code(), result.response().topics().get(0).partitions().get(1).errorCode()); + assertEquals(2, result.records().size()); + } + + @Test + public void testStreamsGroupOffsetCommitWithAssignmentEpochStale() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + StreamsGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true); + + group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("0") + .setSourceTopics(List.of("bar"))))); + + // Member at epoch 10, with partitions assigned at different epochs + group.updateMember(StreamsGroupMember.Builder.withDefaults("member") + .setMemberEpoch(10) + .setAssignedTasks(new TasksTupleWithEpochs( + Map.of("0", Map.of(0, 5, 1, 8)), + Map.of(), Map.of())) + .build()); + + // Commit with member epoch 7 should fail (3 < assignment epochs 8) + assertThrows(StaleMemberEpochException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(3) + .setTopics(List.of(new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List.of( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(200L))))))); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index 6842c2aafa351..de1bf2d82c87f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage; import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder; +import org.apache.kafka.coordinator.group.CommitPartitionValidator; import org.apache.kafka.coordinator.group.OffsetAndMetadata; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; @@ -660,7 +661,7 @@ public void testValidateOffsetCommit(short version) { assertThrows(UnknownMemberIdException.class, () -> group.validateOffsetCommit("", null, -1, isTransactional, version)); - // The member epoch is stale. + // The member epoch is stale (newer than current). if (version >= 9) { assertThrows(StaleMemberEpochException.class, () -> group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); @@ -669,7 +670,7 @@ public void testValidateOffsetCommit(short version) { group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); } - // This should succeed. + // This should succeed (matching member epoch). if (version >= 9) { group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version); } else { @@ -678,6 +679,108 @@ public void testValidateOffsetCommit(short version) { } } + @Test + public void testValidateOffsetCommitWithOlderEpoch() { + StreamsGroup group = createStreamsGroup("group-foo"); + + group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("0") + .setSourceTopics(List.of("input-topic"))))); + + group.updateMember(new StreamsGroupMember.Builder("member-1") + .setMemberEpoch(2) + .setAssignedTasks(new TasksTupleWithEpochs( + Map.of("0", Map.of(0, 2, 1, 1)), + Map.of(), Map.of())) + .build()); + + CommitPartitionValidator validator = group.validateOffsetCommit( + "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion()); + + // Received epoch (1) < assignment epoch (2) should throw + assertThrows(StaleMemberEpochException.class, () -> + validator.validate("input-topic", Uuid.ZERO_UUID, 0)); + } + + @Test + public void testValidateOffsetCommitWithOlderEpochMissingTopology() { + StreamsGroup group = createStreamsGroup("group-foo"); + + group.updateMember(new StreamsGroupMember.Builder("member-1") + .setMemberEpoch(2) + .build()); + + // Topology is retrieved when creating validator, so exception is thrown here + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetCommit("member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion())); + } + + @Test + public void testValidateOffsetCommitWithOlderEpochMissingSubtopology() { + StreamsGroup group = createStreamsGroup("group-foo"); + + group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("0") + .setSourceTopics(List.of("input-topic"))))); + + group.updateMember(new StreamsGroupMember.Builder("member-1") + .setMemberEpoch(2) + .build()); + + CommitPartitionValidator validator = group.validateOffsetCommit( + "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion()); + + assertThrows(StaleMemberEpochException.class, () -> + validator.validate("unknown-topic", Uuid.ZERO_UUID, 0)); + } + + @Test + public void testValidateOffsetCommitWithOlderEpochUnassignedPartition() { + StreamsGroup group = createStreamsGroup("group-foo"); + + group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("0") + .setSourceTopics(List.of("input-topic"))))); + + group.updateMember(new StreamsGroupMember.Builder("member-1") + .setMemberEpoch(2) + .setAssignedTasks(new TasksTupleWithEpochs( + Map.of("0", Map.of(0, 1)), + Map.of(), Map.of())) + .setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY) + .build()); + + CommitPartitionValidator validator = group.validateOffsetCommit( + "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion()); + + // Partition 1 not assigned should throw + assertThrows(StaleMemberEpochException.class, () -> + validator.validate("input-topic", Uuid.ZERO_UUID, 1)); + } + + @Test + public void testValidateOffsetCommitWithOlderEpochValidAssignment() { + StreamsGroup group = createStreamsGroup("group-foo"); + + group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology() + .setSubtopologyId("0") + .setSourceTopics(List.of("input-topic"))))); + + group.updateMember(new StreamsGroupMember.Builder("member-1") + .setMemberEpoch(5) + .setAssignedTasks(new TasksTupleWithEpochs( + Map.of("0", Map.of(0, 2, 1, 2)), + Map.of(), Map.of())) + .build()); + + CommitPartitionValidator validator = group.validateOffsetCommit( + "member-1", "", 2, false, ApiKeys.OFFSET_COMMIT.latestVersion()); + + // Received epoch 2 == assignment epoch 2 should succeed + validator.validate("input-topic", Uuid.ZERO_UUID, 0); + validator.validate("input-topic", Uuid.ZERO_UUID, 1); + } + @Test public void testAsListedGroup() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 194025dd4d025..46ace65cf04f8 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -35,7 +35,6 @@ import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.test.api.Flaky; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -396,7 +395,6 @@ public void shouldBeAbleToPerformMultipleTransactions(final String groupProtocol } } - @Flaky("KAFKA-19816") @ParameterizedTest @MethodSource("groupProtocolAndProcessingThreadsParameters") public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 4e59e9523c468..3a6b15e29b1ba 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -47,6 +47,7 @@ import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -140,11 +141,14 @@ public void shouldWorkWithRebalance( throw new AssertionError("Test called halt(). code:" + statusCode + " message:" + message); }); int numClientsCreated = 0; + int numDataRecordsProcessed = 0; + final int numKeys = 10; + final int maxRecordsPerKey = 1000; IntegrationTestUtils.cleanStateBeforeTest(cluster, SmokeTestDriver.topics()); final String bootstrapServers = cluster.bootstrapServers(); - final Driver driver = new Driver(bootstrapServers, 10, 1000); + final Driver driver = new Driver(bootstrapServers, numKeys, maxRecordsPerKey); driver.start(); System.out.println("started driver"); @@ -183,6 +187,7 @@ public void shouldWorkWithRebalance( assertFalse(client.error(), "The streams application seems to have crashed."); Thread.sleep(100); } + numDataRecordsProcessed += client.totalDataRecordsProcessed(); } } @@ -201,6 +206,7 @@ public void shouldWorkWithRebalance( assertFalse(client.error(), "The streams application seems to have crashed."); Thread.sleep(100); } + numDataRecordsProcessed += client.totalDataRecordsProcessed(); } } @@ -210,5 +216,16 @@ public void shouldWorkWithRebalance( throw new AssertionError(driver.exception()); } assertTrue(driver.result().passed(), driver.result().result()); + + // The one extra record is a record that the driver produces to flush suppress + final int expectedRecords = numKeys * maxRecordsPerKey + 1; + + // We check that we did no have to reprocess any records, which would indicate a bug since everything + // runs locally in this test. + assertEquals(expectedRecords, numDataRecordsProcessed, + String.format("It seems we had to reprocess records, expected %d records, processed %d records.", + expectedRecords, + numDataRecordsProcessed) + ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 7f8057c559787..b0012fa61b403 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -44,6 +44,7 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; @@ -55,6 +56,7 @@ public class SmokeTestClient extends SmokeTestUtil { private boolean uncaughtException = false; private volatile boolean closed; private volatile boolean error; + private final AtomicInteger totalDataRecordsProcessed = new AtomicInteger(0); private static void addShutdownHook(final String name, final Runnable runnable) { if (name != null) { @@ -76,6 +78,10 @@ public boolean error() { return error; } + public int totalDataRecordsProcessed() { + return totalDataRecordsProcessed.get(); + } + public void start(final Properties streamsProperties) { final Topology build = getTopology(); streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); @@ -156,7 +162,7 @@ public Topology getTopology() { source.filterNot((k, v) -> k.equals("flush")) .to("echo", Produced.with(stringSerde, intSerde)); final KStream data = source.filter((key, value) -> value == null || value != END); - data.process(SmokeTestUtil.printProcessorSupplier("data", name)); + data.process(SmokeTestUtil.printProcessorSupplier("data", name, totalDataRecordsProcessed)); // min final KGroupedStream groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde)); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index 7e670802b93ad..d0ad6c8cabba8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.api.Record; import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; public class SmokeTestUtil { @@ -39,6 +40,10 @@ static ProcessorSupplier printProcessorSupplier(fina } static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { + return printProcessorSupplier(topic, name, new AtomicInteger()); + } + + static ProcessorSupplier printProcessorSupplier(final String topic, final String name, final AtomicInteger totalRecordsProcessed) { return () -> new ContextualProcessor<>() { private int numRecordsProcessed = 0; private long smallestOffset = Long.MAX_VALUE; @@ -84,6 +89,7 @@ public void close() { } System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed); System.out.flush(); + totalRecordsProcessed.addAndGet(numRecordsProcessed); } }; }