Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18594 Cleanup BrokerLifecycleManager #18626

Merged
merged 2 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,13 @@ class BrokerLifecycleManager(
val config: KafkaConfig,
val time: Time,
val threadNamePrefix: String,
val isZkBroker: Boolean,
val logDirs: Set[Uuid],
val shutdownHook: () => Unit = () => {}
) extends Logging {

private def logPrefix(): String = {
val builder = new StringBuilder("[BrokerLifecycleManager")
builder.append(" id=").append(config.nodeId)
if (isZkBroker) {
builder.append(" isZkBroker=true")
}
builder.append("] ")
builder.toString()
}
Expand Down Expand Up @@ -158,7 +154,7 @@ class BrokerLifecycleManager(
private var offlineDirs = mutable.Map[Uuid, Boolean]()

/**
* True if we sent a event queue to the active controller requesting controlled
* True if we sent an event queue to the active controller requesting controlled
* shutdown. This variable can only be read or written from the event queue thread.
*/
private var gotControlledShutdownResponse = false
Expand Down Expand Up @@ -270,10 +266,8 @@ class BrokerLifecycleManager(

private class ResendBrokerRegistrationUnlessZkModeEvent extends EventQueue.Event {
override def run(): Unit = {
if (!isZkBroker) {
registered = false
scheduleNextCommunicationImmediately()
}
registered = false
scheduleNextCommunicationImmediately()
}
}

Expand Down Expand Up @@ -366,12 +360,9 @@ class BrokerLifecycleManager(
_clusterId = clusterId
_advertisedListeners = advertisedListeners.duplicate()
_supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures)
if (!isZkBroker) {
// Only KRaft brokers block on registration during startup
eventQueue.scheduleDeferred("initialRegistrationTimeout",
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
}
eventQueue.scheduleDeferred("initialRegistrationTimeout",
new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
new RegistrationTimeoutEvent())
sendBrokerRegistration()
info(s"Incarnation $incarnationId of broker $nodeId in cluster $clusterId " +
"is now STARTING.")
Expand All @@ -393,7 +384,7 @@ class BrokerLifecycleManager(
})
val data = new BrokerRegistrationRequestData().
setBrokerId(nodeId).
setIsMigratingZkBroker(isZkBroker).
setIsMigratingZkBroker(false).
setClusterId(_clusterId).
setFeatures(features).
setIncarnationId(incarnationId).
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ class BrokerServer(
lifecycleManager = new BrokerLifecycleManager(config,
time,
s"broker-${config.nodeId}-",
isZkBroker = false,
logDirs = logManager.directoryIdsSet,
() => new Thread(() => shutdown(), "kafka-shutdown-thread").start())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ class BrokerLifecycleManagerTest {
@Test
def testCreateAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(context.config, context.time, "create-and-close-", isZkBroker = false, Set(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
manager = new BrokerLifecycleManager(context.config, context.time, "create-and-close-", Set(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
manager.close()
}

@Test
def testCreateStartAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(context.config, context.time, "create-start-and-close-", isZkBroker = false, Set(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
manager = new BrokerLifecycleManager(context.config, context.time, "create-start-and-close-", Set(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
Expand All @@ -81,7 +81,7 @@ class BrokerLifecycleManagerTest {
@Test
def testSuccessfulRegistration(): Unit = {
val context = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
Expand All @@ -103,7 +103,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationTimeout(): Unit = {
val context = new RegistrationTestContext(configProperties)
val controllerNode = new Node(3000, "localhost", 8021)
manager = new BrokerLifecycleManager(context.config, context.time, "registration-timeout-", isZkBroker = false, Set(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
manager = new BrokerLifecycleManager(context.config, context.time, "registration-timeout-", Set(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
context.controllerNodeProvider.node.set(controllerNode)
def newDuplicateRegistrationResponse(): Unit = {
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
Expand Down Expand Up @@ -143,7 +143,7 @@ class BrokerLifecycleManagerTest {
@Test
def testControlledShutdown(): Unit = {
val context = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(context.config, context.time, "controlled-shutdown-", isZkBroker = false, Set(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
manager = new BrokerLifecycleManager(context.config, context.time, "controlled-shutdown-", Set(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
Expand Down Expand Up @@ -224,7 +224,7 @@ class BrokerLifecycleManagerTest {
@Test
def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "offline-dirs-sent-in-heartbeat-", isZkBroker = false, Set(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "offline-dirs-sent-in-heartbeat-", Set(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)

Expand All @@ -250,8 +250,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationIncludesDirs(): Unit = {
val logDirs = Set("ad5FLIeCTnaQdai5vOjeng", "ybdzUKmYSLK6oiIpI6CPlw").map(Uuid.fromString)
val ctx = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "registration-includes-dirs-",
isZkBroker = false, logDirs)
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "registration-includes-dirs-", logDirs)
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)

Expand All @@ -268,7 +267,7 @@ class BrokerLifecycleManagerTest {
@Test
def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))

val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
Expand Down
Loading