Skip to content

Commit

Permalink
Merge branch 'trunk' into KAFKA-17668
Browse files Browse the repository at this point in the history
  • Loading branch information
frankvicky committed Jan 19, 2025
2 parents 0c95a3b + 6eddaeb commit 8283d31
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 19 deletions.
19 changes: 2 additions & 17 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.DelayedShareFetch
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zookeeper.ZooKeeperClientException
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
Expand Down Expand Up @@ -767,14 +766,7 @@ class Partition(val topicPartition: TopicPartition,
LeaderRecoveryState.RECOVERED
)

try {
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetDirectoryId)
} catch {
case e: ZooKeeperClientException =>
stateChangeLogger.error(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " +
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
return false
}
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetDirectoryId)

val leaderLog = localLogOrException

Expand Down Expand Up @@ -868,14 +860,7 @@ class Partition(val topicPartition: TopicPartition,
LeaderRecoveryState.of(partitionState.leaderRecoveryState)
)

try {
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
} catch {
case e: ZooKeeperClientException =>
stateChangeLogger.error(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " +
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
return false
}
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId)

val followerLog = localLogOrException
if (isNewLeaderEpoch) {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2833,7 +2833,7 @@ class ReplicaManager(val config: KafkaConfig,
val leaderChangedPartitions = new mutable.HashSet[Partition]
val followerChangedPartitions = new mutable.HashSet[Partition]
if (!localChanges.leaders.isEmpty) {
applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala, localChanges.directoryIds.asScala)
applyLocalLeadersDelta(leaderChangedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala, localChanges.directoryIds.asScala)
}
if (!localChanges.followers.isEmpty) {
applyLocalFollowersDelta(followerChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala, localChanges.directoryIds.asScala)
Expand All @@ -2857,7 +2857,6 @@ class ReplicaManager(val config: KafkaConfig,

private def applyLocalLeadersDelta(
changedPartitions: mutable.Set[Partition],
newImage: MetadataImage,
delta: TopicsDelta,
offsetCheckpoints: OffsetCheckpoints,
localLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo],
Expand Down
109 changes: 109 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3560,6 +3560,115 @@ class KafkaApisTest extends Logging {
assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
}

@Test
def testUnauthorizedTopicMetadataRequest(): Unit = {
// 1. Set up broker information
val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val broker = new UpdateMetadataBroker()
.setId(0)
.setRack("rack")
.setEndpoints(Seq(
new UpdateMetadataEndpoint()
.setHost("broker0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value)
).asJava)

// 2. Set up authorizer
val authorizer: Authorizer = mock(classOf[Authorizer])
val unauthorizedTopic = "unauthorized-topic"
val authorizedTopic = "authorized-topic"

val expectedActions = Seq(
new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true),
new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true)
)

when(authorizer.authorize(any[RequestContext], argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))))
.thenAnswer { invocation =>
val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
actions.map { action =>
if (action.resourcePattern().name().equals(authorizedTopic))
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
}

// 3. Set up MetadataCache
val authorizedTopicId = Uuid.randomUuid()
val unauthorizedTopicId = Uuid.randomUuid()

val topicIds = new util.HashMap[String, Uuid]()
topicIds.put(authorizedTopic, authorizedTopicId)
topicIds.put(unauthorizedTopic, unauthorizedTopicId)

def createDummyPartitionStates(topic: String) = {
new UpdateMetadataPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setReplicas(Collections.singletonList(0))
.setZkVersion(0)
.setIsr(Collections.singletonList(0))
}

// Send UpdateMetadataReq to update MetadataCache
val partitionStates = Seq(unauthorizedTopic, authorizedTopic).map(createDummyPartitionStates)

val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build()
MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest)

// 4. Send TopicMetadataReq using topicId
val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build()
val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleTopicMetadataRequest(repByTopicId)
val metadataByTopicIdResp = verifyNoThrottling[MetadataResponse](repByTopicId)

val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head))

metadataByTopicId.foreach { case (topicId, metadataResponseTopic) =>
if (topicId == unauthorizedTopicId) {
// Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error regardless of leaking the existence of topic id
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
// Do not return topic information on unauthorized error
assertNull(metadataResponseTopic.name())
} else {
assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
assertEquals(authorizedTopic, metadataResponseTopic.name())
}
}
kafkaApis.close()

// 4. Send TopicMetadataReq using topic name
reset(clientRequestQuotaManager, requestChannel)
val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build()
val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleTopicMetadataRequest(repByTopicName)
val metadataByTopicNameResp = verifyNoThrottling[MetadataResponse](repByTopicName)

val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head))

metadataByTopicName.foreach { case (topicName, metadataResponseTopic) =>
if (topicName == unauthorizedTopic) {
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
// Do not return topic Id on unauthorized error
assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId())
} else {
assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
assertEquals(authorizedTopicId, metadataResponseTopic.topicId())
}
}
}

/**
* Verifies that sending a fetch request with version 9 works correctly when
* ReplicaManager.getLogConfig returns None.
Expand Down

0 comments on commit 8283d31

Please sign in to comment.