From ebe3537a9ba572c0c23fe31be495f66bfc6a5e05 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 19 Jan 2025 21:22:25 +0800 Subject: [PATCH] clean up --- .../kafka/server/BrokerLifecycleManager.scala | 23 ++++++------------- .../scala/kafka/server/BrokerServer.scala | 1 - 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index de8f82ddb3021..bd01311feddc4 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -58,7 +58,6 @@ class BrokerLifecycleManager( val config: KafkaConfig, val time: Time, val threadNamePrefix: String, - val isZkBroker: Boolean, val logDirs: Set[Uuid], val shutdownHook: () => Unit = () => {} ) extends Logging { @@ -66,9 +65,6 @@ class BrokerLifecycleManager( private def logPrefix(): String = { val builder = new StringBuilder("[BrokerLifecycleManager") builder.append(" id=").append(config.nodeId) - if (isZkBroker) { - builder.append(" isZkBroker=true") - } builder.append("] ") builder.toString() } @@ -158,7 +154,7 @@ class BrokerLifecycleManager( private var offlineDirs = mutable.Map[Uuid, Boolean]() /** - * True if we sent a event queue to the active controller requesting controlled + * True if we sent an event queue to the active controller requesting controlled * shutdown. This variable can only be read or written from the event queue thread. */ private var gotControlledShutdownResponse = false @@ -270,10 +266,8 @@ class BrokerLifecycleManager( private class ResendBrokerRegistrationUnlessZkModeEvent extends EventQueue.Event { override def run(): Unit = { - if (!isZkBroker) { - registered = false - scheduleNextCommunicationImmediately() - } + registered = false + scheduleNextCommunicationImmediately() } } @@ -366,12 +360,9 @@ class BrokerLifecycleManager( _clusterId = clusterId _advertisedListeners = advertisedListeners.duplicate() _supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures) - if (!isZkBroker) { - // Only KRaft brokers block on registration during startup - eventQueue.scheduleDeferred("initialRegistrationTimeout", - new DeadlineFunction(time.nanoseconds() + initialTimeoutNs), - new RegistrationTimeoutEvent()) - } + eventQueue.scheduleDeferred("initialRegistrationTimeout", + new DeadlineFunction(time.nanoseconds() + initialTimeoutNs), + new RegistrationTimeoutEvent()) sendBrokerRegistration() info(s"Incarnation $incarnationId of broker $nodeId in cluster $clusterId " + "is now STARTING.") @@ -393,7 +384,7 @@ class BrokerLifecycleManager( }) val data = new BrokerRegistrationRequestData(). setBrokerId(nodeId). - setIsMigratingZkBroker(isZkBroker). + setIsMigratingZkBroker(false). setClusterId(_clusterId). setFeatures(features). setIncarnationId(incarnationId). diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 47b89dd1de18f..442e6a403d107 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -223,7 +223,6 @@ class BrokerServer( lifecycleManager = new BrokerLifecycleManager(config, time, s"broker-${config.nodeId}-", - isZkBroker = false, logDirs = logManager.directoryIdsSet, () => new Thread(() => shutdown(), "kafka-shutdown-thread").start())