Skip to content

Commit

Permalink
KAFKA-18594: Cleanup BrokerLifecycleManager (apache#18626)
Browse files Browse the repository at this point in the history
Reviewers: Christo Lolov <[email protected]>
  • Loading branch information
m1a2st authored and mjsax committed Jan 27, 2025
1 parent a6727c8 commit b6d21e3
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 26 deletions.
23 changes: 7 additions & 16 deletions core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,13 @@ class BrokerLifecycleManager(
val config: KafkaConfig,
val time: Time,
val threadNamePrefix: String,
val isZkBroker: Boolean,
val logDirs: Set[Uuid],
val shutdownHook: () => Unit = () => {}
) extends Logging {

private def logPrefix(): String = {
val builder = new StringBuilder("[BrokerLifecycleManager")
builder.append(" id=").append(config.nodeId)
if (isZkBroker) {
builder.append(" isZkBroker=true")
}
builder.append("] ")
builder.toString()
}
Expand Down Expand Up @@ -158,7 +154,7 @@ class BrokerLifecycleManager(
private var offlineDirs = mutable.Map[Uuid, Boolean]()

/**
* True if we sent a event queue to the active controller requesting controlled
* True if we sent an event queue to the active controller requesting controlled
* shutdown. This variable can only be read or written from the event queue thread.
*/
private var gotControlledShutdownResponse = false
Expand Down Expand Up @@ -270,10 +266,8 @@ class BrokerLifecycleManager(

private class ResendBrokerRegistrationUnlessZkModeEvent extends EventQueue.Event {
override def run(): Unit = {
if (!isZkBroker) {
registered = false
scheduleNextCommunicationImmediately()
}
registered = false
scheduleNextCommunicationImmediately()
}
}

Expand Down Expand Up @@ -366,12 +360,9 @@ class BrokerLifecycleManager(
_clusterId = clusterId
_advertisedListeners = advertisedListeners.duplicate()
_supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures)
if (!isZkBroker) {
// Only KRaft brokers block on registration during startup
eventQueue.scheduleDeferred("initialRegistrationTimeout",
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
}
eventQueue.scheduleDeferred("initialRegistrationTimeout",
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
sendBrokerRegistration()
info(s"Incarnation $incarnationId of broker $nodeId in cluster $clusterId " +
"is now STARTING.")
Expand All @@ -393,7 +384,7 @@ class BrokerLifecycleManager(
})
val data = new BrokerRegistrationRequestData().
setBrokerId(nodeId).
setIsMigratingZkBroker(isZkBroker).
setIsMigratingZkBroker(false).
setClusterId(_clusterId).
setFeatures(features).
setIncarnationId(incarnationId).
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ class BrokerServer(
lifecycleManager = new BrokerLifecycleManager(config,
time,
s"broker-${config.nodeId}-",
isZkBroker = false,
logDirs = logManager.directoryIdsSet,
() => new Thread(() => shutdown(), "kafka-shutdown-thread").start())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ class BrokerLifecycleManagerTest {
@Test
def testCreateAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(context.config, context.time, "create-and-close-", isZkBroker = false, Set(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
manager = new BrokerLifecycleManager(context.config, context.time, "create-and-close-", Set(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
manager.close()
}

@Test
def testCreateStartAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(context.config, context.time, "create-start-and-close-", isZkBroker = false, Set(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
manager = new BrokerLifecycleManager(context.config, context.time, "create-start-and-close-", Set(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
Expand All @@ -81,7 +81,7 @@ class BrokerLifecycleManagerTest {
@Test
def testSuccessfulRegistration(): Unit = {
val context = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
Expand All @@ -103,7 +103,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationTimeout(): Unit = {
val context = new RegistrationTestContext(configProperties)
val controllerNode = new Node(3000, "localhost", 8021)
manager = new BrokerLifecycleManager(context.config, context.time, "registration-timeout-", isZkBroker = false, Set(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
manager = new BrokerLifecycleManager(context.config, context.time, "registration-timeout-", Set(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
context.controllerNodeProvider.node.set(controllerNode)
def newDuplicateRegistrationResponse(): Unit = {
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
Expand Down Expand Up @@ -143,7 +143,7 @@ class BrokerLifecycleManagerTest {
@Test
def testControlledShutdown(): Unit = {
val context = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(context.config, context.time, "controlled-shutdown-", isZkBroker = false, Set(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
manager = new BrokerLifecycleManager(context.config, context.time, "controlled-shutdown-", Set(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
Expand Down Expand Up @@ -224,7 +224,7 @@ class BrokerLifecycleManagerTest {
@Test
def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "offline-dirs-sent-in-heartbeat-", isZkBroker = false, Set(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "offline-dirs-sent-in-heartbeat-", Set(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)

Expand All @@ -250,8 +250,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationIncludesDirs(): Unit = {
val logDirs = Set("ad5FLIeCTnaQdai5vOjeng", "ybdzUKmYSLK6oiIpI6CPlw").map(Uuid.fromString)
val ctx = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "registration-includes-dirs-",
isZkBroker = false, logDirs)
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "registration-includes-dirs-", logDirs)
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)

Expand All @@ -268,7 +267,7 @@ class BrokerLifecycleManagerTest {
@Test
def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))

val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
Expand Down

0 comments on commit b6d21e3

Please sign in to comment.