Skip to content

Commit 35c2c22

Browse files
showuonhachikuji
authored andcommitted
KAFKA-10401; Ensure currentStateTimeStamp is set correctly by group coordinator (#9202)
Fix the `currentStateTimeStamp` doesn't get set in `GROUP_METADATA_VALUE_SCHEMA_V3`, and did a small refactor to use the `GROUP_VALUE_SCHEMAS.size - 1` replace the default hard-coded max version number. Also add test for it. Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]>
1 parent 06484b5 commit 35c2c22

File tree

2 files changed

+48
-13
lines changed

2 files changed

+48
-13
lines changed

core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

+7-11
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,7 @@ object GroupMetadataManager {
10741074

10751075
private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
10761076
private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
1077+
private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = GROUP_VALUE_SCHEMAS.keySet.max
10771078

10781079
private def schemaForKey(version: Int) = {
10791080
val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
@@ -1338,23 +1339,18 @@ object GroupMetadataManager {
13381339
val valueSchema = schemaForGroupValue(version)
13391340
val value = valueSchema.read(buffer)
13401341

1341-
if (version >= 0 && version <= 3) {
1342+
if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
13421343
val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
13431344
val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
13441345
val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
13451346
val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
13461347
val memberMetadataArray = value.getArray(MEMBERS_KEY)
13471348
val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
1348-
val currentStateTimestamp: Option[Long] = version match {
1349-
case version if version == 2 =>
1350-
if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
1351-
val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
1352-
if (timestamp == -1) None else Some(timestamp)
1353-
} else
1354-
None
1355-
case _ =>
1356-
None
1357-
}
1349+
val currentStateTimestamp: Option[Long] =
1350+
if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
1351+
val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
1352+
if (timestamp == -1) None else Some(timestamp)
1353+
} else None
13581354

13591355
val members = memberMetadataArray.map { memberMetadataObj =>
13601356
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]

core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala

+41-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ import org.apache.kafka.common.protocol.Errors
3030
import org.apache.kafka.common.record._
3131
import org.apache.kafka.common.requests.OffsetFetchResponse
3232
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
33+
import org.apache.kafka.common.KafkaException
3334
import org.easymock.{Capture, EasyMock, IAnswer}
34-
import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
35+
import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue, assertThrows}
3536
import org.junit.{Before, Test}
3637
import org.scalatest.Assertions.fail
3738
import java.nio.ByteBuffer
@@ -802,7 +803,45 @@ class GroupMetadataManagerTest {
802803
}
803804

804805
@Test
805-
def testReadFromOldGroupMetadata() {
806+
def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
807+
val generation = 1
808+
val protocol = "range"
809+
val memberId = "memberId"
810+
val unsupportedVersion = Short.MinValue
811+
812+
// put the unsupported version as the version value
813+
val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
814+
.value().putShort(unsupportedVersion)
815+
// reset the position to the starting position 0 so that it can read the data in correct order
816+
groupMetadataRecordValue.position(0)
817+
818+
val e = assertThrows(classOf[KafkaException],
819+
() => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
820+
assertEquals(s"Unknown group metadata version ${unsupportedVersion}", e.getMessage)
821+
}
822+
823+
@Test
824+
def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
825+
val generation = 1
826+
val protocol = "range"
827+
val memberId = "memberId"
828+
829+
for (apiVersion <- ApiVersion.allVersions) {
830+
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
831+
832+
val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
833+
// GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
834+
if (apiVersion >= KAFKA_2_1_IV0)
835+
assertEquals(s"the apiVersion $apiVersion doesn't set the currentStateTimestamp correctly.",
836+
Some(time.milliseconds()), deserializedGroupMetadata.currentStateTimestamp)
837+
else
838+
assertTrue(s"the apiVersion $apiVersion should not set the currentStateTimestamp.",
839+
deserializedGroupMetadata.currentStateTimestamp.isEmpty)
840+
}
841+
}
842+
843+
@Test
844+
def testReadFromOldGroupMetadata(): Unit = {
806845
val generation = 1
807846
val protocol = "range"
808847
val memberId = "memberId"

0 commit comments

Comments
 (0)