Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
m1a2st committed Jan 19, 2025
1 parent 516d524 commit ebe3537
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 17 deletions.
23 changes: 7 additions & 16 deletions core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,13 @@ class BrokerLifecycleManager(
val config: KafkaConfig,
val time: Time,
val threadNamePrefix: String,
val isZkBroker: Boolean,
val logDirs: Set[Uuid],
val shutdownHook: () => Unit = () => {}
) extends Logging {

private def logPrefix(): String = {
val builder = new StringBuilder("[BrokerLifecycleManager")
builder.append(" id=").append(config.nodeId)
if (isZkBroker) {
builder.append(" isZkBroker=true")
}
builder.append("] ")
builder.toString()
}
Expand Down Expand Up @@ -158,7 +154,7 @@ class BrokerLifecycleManager(
private var offlineDirs = mutable.Map[Uuid, Boolean]()

/**
* True if we sent a event queue to the active controller requesting controlled
* True if we sent an event queue to the active controller requesting controlled
* shutdown. This variable can only be read or written from the event queue thread.
*/
private var gotControlledShutdownResponse = false
Expand Down Expand Up @@ -270,10 +266,8 @@ class BrokerLifecycleManager(

private class ResendBrokerRegistrationUnlessZkModeEvent extends EventQueue.Event {
override def run(): Unit = {
if (!isZkBroker) {
registered = false
scheduleNextCommunicationImmediately()
}
registered = false
scheduleNextCommunicationImmediately()
}
}

Expand Down Expand Up @@ -366,12 +360,9 @@ class BrokerLifecycleManager(
_clusterId = clusterId
_advertisedListeners = advertisedListeners.duplicate()
_supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures)
if (!isZkBroker) {
// Only KRaft brokers block on registration during startup
eventQueue.scheduleDeferred("initialRegistrationTimeout",
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
}
eventQueue.scheduleDeferred("initialRegistrationTimeout",
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
sendBrokerRegistration()
info(s"Incarnation $incarnationId of broker $nodeId in cluster $clusterId " +
"is now STARTING.")
Expand All @@ -393,7 +384,7 @@ class BrokerLifecycleManager(
})
val data = new BrokerRegistrationRequestData().
setBrokerId(nodeId).
setIsMigratingZkBroker(isZkBroker).
setIsMigratingZkBroker(false).
setClusterId(_clusterId).
setFeatures(features).
setIncarnationId(incarnationId).
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ class BrokerServer(
lifecycleManager = new BrokerLifecycleManager(config,
time,
s"broker-${config.nodeId}-",
isZkBroker = false,
logDirs = logManager.directoryIdsSet,
() => new Thread(() => shutdown(), "kafka-shutdown-thread").start())

Expand Down

0 comments on commit ebe3537

Please sign in to comment.