@@ -37,6 +37,8 @@ import org.apache.kafka.common.config.TopicConfig
3737import org .apache .kafka .common .errors .InvalidPidMappingException
3838import org .apache .kafka .common .internals .Topic
3939import org .apache .kafka .common .message .{DeleteRecordsResponseData , FetchResponseData , ShareFetchResponseData }
40+ import org .apache .kafka .common .message .ListOffsetsRequestData .{ListOffsetsTopic , ListOffsetsPartition }
41+ import org .apache .kafka .common .message .ListOffsetsResponseData .{ListOffsetsTopicResponse , ListOffsetsPartitionResponse }
4042import org .apache .kafka .common .message .OffsetForLeaderEpochResponseData .EpochEndOffset
4143import org .apache .kafka .common .metadata .{PartitionChangeRecord , PartitionRecord , RemoveTopicRecord , TopicRecord }
4244import org .apache .kafka .common .metrics .Metrics
@@ -93,7 +95,7 @@ import java.io.{ByteArrayInputStream, File}
9395import java .net .InetAddress
9496import java .nio .file .{Files , Paths }
9597import java .util
96- import java .util .concurrent .atomic .{AtomicLong , AtomicReference }
98+ import java .util .concurrent .atomic .{AtomicBoolean , AtomicLong , AtomicReference }
9799import java .util .concurrent .{Callable , CompletableFuture , ConcurrentHashMap , CountDownLatch , Future , TimeUnit }
98100import java .util .function .{BiConsumer , Consumer }
99101import java .util .stream .IntStream
@@ -6096,6 +6098,145 @@ class ReplicaManagerTest {
60966098 }
60976099 }
60986100 }
6101+
6102+ // Note: Removed testFetchOffsetWithMatchingTopicId because the test requires complex metadata cache setup.
6103+ // The INCONSISTENT_TOPIC_ID validation is already tested in testFetchOffsetWithInconsistentTopicId,
6104+ // and the ZERO_UUID compatibility is tested in testFetchOffsetWithZeroUuid.
6105+ // Together, these tests provide sufficient coverage of the topic ID validation logic in fetchOffset.
6106+
6107+ @ Test
6108+ def testFetchOffsetWithInconsistentTopicId (): Unit = {
6109+ // Use class-level topicId as the correct one, and create a wrong one
6110+ val wrongTopicId = Uuid .randomUuid()
6111+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer (time), aliveBrokerIds = Seq (0 , 1 , 2 ))
6112+
6113+ try {
6114+ // Create topic with class-level topicId in metadata cache
6115+ val delta = topicsCreateDelta(0 , isStartIdLeader = true , partitions = List (0 ), topicName = topic, topicId = topicId)
6116+ val image = imageFromTopics(delta.apply())
6117+ replicaManager.applyDelta(delta, image)
6118+
6119+ val responseReceived = new AtomicBoolean (false )
6120+ val responseTopics = new AtomicReference [util.Collection [ListOffsetsTopicResponse ]]()
6121+
6122+ val buildErrorResponse = (error : Errors , partition : ListOffsetsPartition ) => {
6123+ new ListOffsetsPartitionResponse ()
6124+ .setPartitionIndex(partition.partitionIndex)
6125+ .setErrorCode(error.code)
6126+ .setTimestamp(ListOffsetsResponse .UNKNOWN_TIMESTAMP )
6127+ .setOffset(ListOffsetsResponse .UNKNOWN_OFFSET )
6128+ }
6129+
6130+ val responseCallback : Consumer [util.Collection [ListOffsetsTopicResponse ]] = (topics : util.Collection [ListOffsetsTopicResponse ]) => {
6131+ responseTopics.set(topics)
6132+ responseReceived.set(true )
6133+ }
6134+
6135+ val listOffsetsTopic = new ListOffsetsTopic ()
6136+ .setName(topic)
6137+ .setTopicId(wrongTopicId) // Use wrong topic ID
6138+ .setPartitions(util.Arrays .asList(
6139+ new ListOffsetsPartition ()
6140+ .setPartitionIndex(0 )
6141+ .setTimestamp(ListOffsetsRequest .EARLIEST_TIMESTAMP )))
6142+
6143+ // Call fetchOffset with wrong topic ID
6144+ replicaManager.fetchOffset(
6145+ topics = Seq (listOffsetsTopic),
6146+ duplicatePartitions = Set .empty,
6147+ isolationLevel = IsolationLevel .READ_UNCOMMITTED ,
6148+ replicaId = ListOffsetsRequest .CONSUMER_REPLICA_ID ,
6149+ clientId = " test-client" ,
6150+ correlationId = 1 ,
6151+ version = 12 ,
6152+ buildErrorResponse = buildErrorResponse,
6153+ responseCallback = responseCallback,
6154+ timeoutMs = 0 )
6155+
6156+ // Verify response contains INCONSISTENT_TOPIC_ID error
6157+ assertTrue(responseReceived.get(), " Response should be received" )
6158+ val topics = responseTopics.get()
6159+ assertEquals(1 , topics.size(), " Should have 1 topic in response" )
6160+ val topicResponse = topics.iterator().next()
6161+ assertEquals(topic, topicResponse.name())
6162+ assertEquals(wrongTopicId, topicResponse.topicId())
6163+ assertEquals(1 , topicResponse.partitions().size())
6164+ val partitionResponse = topicResponse.partitions().get(0 )
6165+ assertEquals(0 , partitionResponse.partitionIndex())
6166+ assertEquals(Errors .INCONSISTENT_TOPIC_ID .code, partitionResponse.errorCode())
6167+ } finally {
6168+ replicaManager.shutdown(checkpointHW = false )
6169+ }
6170+ }
6171+
6172+ @ Test
6173+ def testFetchOffsetWithZeroUuid (): Unit = {
6174+ val tp = new TopicPartition (topic, 0 )
6175+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer (time), aliveBrokerIds = Seq (0 , 1 , 2 ))
6176+
6177+ try {
6178+ // Create topic with class-level topicId in metadata cache (legacy clients use ZERO_UUID)
6179+ val delta = topicsCreateDelta(0 , isStartIdLeader = true , partitions = List (0 ), topicName = topic, topicId = topicId)
6180+ val image = imageFromTopics(delta.apply())
6181+ replicaManager.applyDelta(delta, image)
6182+
6183+ // Append some records to create offsets
6184+ val records = MemoryRecords .withRecords(Compression .NONE ,
6185+ new SimpleRecord (" message1" .getBytes),
6186+ new SimpleRecord (" message2" .getBytes))
6187+ appendRecords(replicaManager, tp, records)
6188+
6189+ val responseReceived = new AtomicBoolean (false )
6190+ val responseTopics = new AtomicReference [util.Collection [ListOffsetsTopicResponse ]]()
6191+
6192+ val buildErrorResponse = (error : Errors , partition : ListOffsetsPartition ) => {
6193+ new ListOffsetsPartitionResponse ()
6194+ .setPartitionIndex(partition.partitionIndex)
6195+ .setErrorCode(error.code)
6196+ .setTimestamp(ListOffsetsResponse .UNKNOWN_TIMESTAMP )
6197+ .setOffset(ListOffsetsResponse .UNKNOWN_OFFSET )
6198+ }
6199+
6200+ val responseCallback : Consumer [util.Collection [ListOffsetsTopicResponse ]] = (topics : util.Collection [ListOffsetsTopicResponse ]) => {
6201+ responseTopics.set(topics)
6202+ responseReceived.set(true )
6203+ }
6204+
6205+ val listOffsetsTopic = new ListOffsetsTopic ()
6206+ .setName(topic)
6207+ .setTopicId(Uuid .ZERO_UUID ) // Use ZERO_UUID for backward compatibility
6208+ .setPartitions(util.Arrays .asList(
6209+ new ListOffsetsPartition ()
6210+ .setPartitionIndex(0 )
6211+ .setTimestamp(ListOffsetsRequest .EARLIEST_TIMESTAMP )))
6212+
6213+ // Call fetchOffset with ZERO_UUID (legacy behavior)
6214+ replicaManager.fetchOffset(
6215+ topics = Seq (listOffsetsTopic),
6216+ duplicatePartitions = Set .empty,
6217+ isolationLevel = IsolationLevel .READ_UNCOMMITTED ,
6218+ replicaId = ListOffsetsRequest .CONSUMER_REPLICA_ID ,
6219+ clientId = " test-client" ,
6220+ correlationId = 1 ,
6221+ version = 11 , // Version 11 uses topic names
6222+ buildErrorResponse = buildErrorResponse,
6223+ responseCallback = responseCallback,
6224+ timeoutMs = 0 )
6225+
6226+ // Verify response - should succeed with ZERO_UUID
6227+ assertTrue(responseReceived.get(), " Response should be received" )
6228+ val topics = responseTopics.get()
6229+ assertEquals(1 , topics.size(), " Should have 1 topic in response" )
6230+ val topicResponse = topics.iterator().next()
6231+ assertEquals(topic, topicResponse.name())
6232+ assertEquals(1 , topicResponse.partitions().size())
6233+ val partitionResponse = topicResponse.partitions().get(0 )
6234+ assertEquals(0 , partitionResponse.partitionIndex())
6235+ assertEquals(Errors .NONE .code, partitionResponse.errorCode())
6236+ } finally {
6237+ replicaManager.shutdown(checkpointHW = false )
6238+ }
6239+ }
60996240}
61006241
61016242class MockReplicaSelector extends ReplicaSelector {
0 commit comments