diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index adfb657b77603..b0fce6ab36ac5 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -22,7 +22,9 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, PartitionInfo} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.serialization.{Deserializer, Serializer} +import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.{BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -36,6 +38,25 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { + private var currentTestName: String = _ + + @BeforeEach + override def setUp(testInfo: TestInfo): Unit = { + currentTestName = testInfo.getTestMethod.get().getName + super.setUp(testInfo) + } + + override protected def brokerPropertyOverrides(properties: Properties): Unit = { + super.brokerPropertyOverrides(properties) + + if (currentTestName != null && currentTestName.equals("testCoordinatorFailover")) { + // Enable controlled shutdown to allow the broker to notify the controller before shutting down. + // This speeds up the test by triggering an immediate failover instead of waiting for the + // broker session timeout (default: 9s) to expire. + properties.setProperty(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, "true") + } + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersAll")) def testSimpleConsumption(groupProtocol: String): Unit = {