diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java index 1144b3e999c..0639bff751b 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java @@ -56,7 +56,6 @@ public class KafkaBrokerConfigurationBuilder { private final static String CONTROL_PLANE_LISTENER_NAME = "CONTROLPLANE-9090"; private final static String REPLICATION_LISTENER_NAME = "REPLICATION-9091"; - // Names of environment variables placeholders replaced only in the running container private final static String PLACEHOLDER_CERT_STORE_PASSWORD = "${CERTS_STORE_PASSWORD}"; private final static String PLACEHOLDER_RACK_ID = "${STRIMZI_RACK_ID}"; @@ -254,18 +253,19 @@ public KafkaBrokerConfigurationBuilder withKRaft(String clusterName, String name * generate the per-broker configuration which uses actual broker IDs and addresses instead of just placeholders. * * @param clusterName Name of the cluster (important for the advertised hostnames) + * @param kafkaVersion Kafka version of the cluster * @param namespace Namespace (important for generating the advertised hostname) * @param kafkaListeners The listeners configuration from the Kafka CR * @param advertisedHostnameProvider Lambda method which provides the advertised hostname for given listener and * broker. This is used to configure the user-configurable listeners. * @param advertisedPortProvider Lambda method which provides the advertised port for given listener and broker. * This is used to configure the user-configurable listeners. - * * @return Returns the builder instance */ @SuppressWarnings({"checkstyle:CyclomaticComplexity"}) public KafkaBrokerConfigurationBuilder withListeners( String clusterName, + KafkaVersion kafkaVersion, String namespace, List kafkaListeners, Function advertisedHostnameProvider, @@ -281,8 +281,9 @@ public KafkaBrokerConfigurationBuilder withListeners( if (node.controller() || (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration())) { listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090"); - // Control Plane listener to be advertised only with broker in ZooKeeper-based or migration but NOT when full KRaft only or mixed - if (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration()) { + // Control Plane listener to be advertised with broker in ZooKeeper-based or migration + // Kafka version 3.9.0 requires advertised.listeners configuration for controllers, however the previous versions forbids the configuration for controllers. + if ((node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration()) || (KafkaVersion.compareDottedVersions(kafkaVersion.version(), "3.9.0") >= 0)) { advertisedListeners.add(String.format("%s://%s:9090", CONTROL_PLANE_LISTENER_NAME, // Pod name constructed to be templatable for each individual ordinal @@ -350,13 +351,18 @@ public KafkaBrokerConfigurationBuilder withListeners( writer.println("listener.security.protocol.map=" + String.join(",", securityProtocol)); writer.println("listeners=" + String.join(",", listeners)); - // Advertised listeners are not allowed on KRaft nodes with controller only role if (!isKraftControllerOnly) { writer.println("advertised.listeners=" + String.join(",", advertisedListeners)); writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME); - } else if (node.controller() && kafkaMetadataConfigState.isZooKeeperToPostMigration()) { - // needed for KRaft controller only as well until post-migration because it needs to contact brokers - writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME); + } else if (node.controller()) { + if (advertisedListeners.size() > 0) { + writer.println("advertised.listeners=" + String.join(",", advertisedListeners)); + } + + if (kafkaMetadataConfigState.isZooKeeperToPostMigration()) { + // needed for KRaft controller only as well until post-migration because it needs to contact brokers + writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME); + } } // Control plane listener is on all ZooKeeper based brokers, needed during migration as well, when broker still using ZooKeeper but KRaft controllers are ready diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java index 08d0e76702a..e3e9257a053 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java @@ -1775,6 +1775,7 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map< .withRackId(rack) .withLogDirs(VolumeUtils.createVolumeMounts(pool.storage, false)) .withListeners(cluster, + kafkaVersion, namespace, listeners, listenerId -> advertisedHostnames.get(node.nodeId()).get(listenerId), diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/KafkaVersionTestUtils.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/KafkaVersionTestUtils.java index a793d53b68b..8f6780bb8c7 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/KafkaVersionTestUtils.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/KafkaVersionTestUtils.java @@ -28,6 +28,7 @@ public class KafkaVersionTestUtils { public static final String LATEST_ZOOKEEPER_VERSION = "3.8.4"; public static final String LATEST_CHECKSUM = "ABCD1234"; public static final String LATEST_THIRD_PARTY_VERSION = "3.8.x"; + public static final String KAFKA_390_VERSION = "3.9.0"; public static final String LATEST_KAFKA_IMAGE = KAFKA_IMAGE_STR + LATEST_KAFKA_VERSION; public static final String LATEST_KAFKA_CONNECT_IMAGE = KAFKA_CONNECT_IMAGE_STR + LATEST_KAFKA_VERSION; public static final String LATEST_KAFKA_MIRROR_MAKER_IMAGE = KAFKA_MIRROR_MAKER_IMAGE_STR + LATEST_KAFKA_VERSION; diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java index ce70e38255c..a7a1317489a 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java @@ -31,6 +31,7 @@ import io.strimzi.api.kafka.model.kafka.tieredstorage.RemoteStorageManager; import io.strimzi.api.kafka.model.kafka.tieredstorage.TieredStorageCustom; import io.strimzi.kafka.oauth.server.ServerConfig; +import io.strimzi.operator.cluster.KafkaVersionTestUtils; import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; @@ -66,6 +67,9 @@ public class KafkaBrokerConfigurationBuilderTest { private final static NodeRef NODE_REF = new NodeRef("my-cluster-kafka-2", 2, "kafka", false, true); + private final static KafkaVersion KAFKA_3_8_0 = new KafkaVersion(KafkaVersionTestUtils.LATEST_KAFKA_VERSION, "", "", "", "", false, false, ""); + private final static KafkaVersion KAFKA_3_9_0 = new KafkaVersion(KafkaVersionTestUtils.KAFKA_390_VERSION, "", "", "", "", false, false, ""); + @ParallelTest public void testBrokerId() { String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) @@ -664,7 +668,7 @@ public void testJbodStorageLogDirs() { @ParallelTest public void testWithNoListeners() { String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", emptyList(), null, null) + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", emptyList(), null, null) .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -733,7 +737,7 @@ public void testConnectionLimits() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", asList(listener1, listener2, listener3, listener4), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", asList(listener1, listener2, listener3, listener4), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -775,7 +779,7 @@ public void testWithPlainListenersWithoutAuth() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -821,7 +825,7 @@ public void testKraftListenersMixedNodes() { NodeRef nodeRef = nodes.stream().filter(nr -> nr.nodeId() == 2).findFirst().get(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, nodeRef, KafkaMetadataConfigurationState.KRAFT) .withKRaft("my-cluster", "my-namespace", nodes) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") .build(); assertThat(configuration, isEquivalent("node.id=2", @@ -850,6 +854,54 @@ public void testKraftListenersMixedNodes() { "ssl.endpoint.identification.algorithm=HTTPS")); } + @ParallelTest + public void testKraftListenersMixedNodesWithVersion3_9() { + Set nodes = Set.of( + new NodeRef("my-cluster-kafka-0", 0, "kafka", true, true), + new NodeRef("my-cluster-kafka-1", 1, "kafka", true, true), + new NodeRef("my-cluster-kafka-2", 2, "kafka", true, true) + ); + + GenericKafkaListener listener = new GenericKafkaListenerBuilder() + .withName("plain") + .withPort(9092) + .withType(KafkaListenerType.INTERNAL) + .withTls(false) + .build(); + + NodeRef nodeRef = nodes.stream().filter(nr -> nr.nodeId() == 2).findFirst().get(); + String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, nodeRef, KafkaMetadataConfigurationState.KRAFT) + .withKRaft("my-cluster", "my-namespace", nodes) + .withListeners("my-cluster", KAFKA_3_9_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .build(); + + // KRaft controller or mixed node with version 3.9 or later should have advertised listeners configured with controller listener + assertThat(configuration, isEquivalent("node.id=2", + "process.roles=broker,controller", + "controller.listener.names=CONTROLPLANE-9090", + "controller.quorum.voters=0@my-cluster-kafka-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-kafka-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090", + "listener.name.controlplane-9090.ssl.client.auth=required", + "listener.name.controlplane-9090.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.controlplane-9090.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.keystore.type=PKCS12", + "listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.truststore.type=PKCS12", + "listener.name.replication-9091.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.replication-9091.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.replication-9091.ssl.keystore.type=PKCS12", + "listener.name.replication-9091.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.replication-9091.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.replication-9091.ssl.truststore.type=PKCS12", + "listener.name.replication-9091.ssl.client.auth=required", + "listeners=CONTROLPLANE-9090://0.0.0.0:9090,REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092", + "advertised.listeners=CONTROLPLANE-9090://my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc:9090,REPLICATION-9091://my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc:9091,PLAIN-9092://my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc:9092", + "listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,PLAIN-9092:PLAINTEXT", + "inter.broker.listener.name=REPLICATION-9091", + "sasl.enabled.mechanisms=", + "ssl.endpoint.identification.algorithm=HTTPS")); + } + @ParallelTest public void testKraftListenersBrokerAndControllerNodes() { Set nodes = Set.of( @@ -872,7 +924,7 @@ public void testKraftListenersBrokerAndControllerNodes() { NodeRef nodeRef = nodes.stream().filter(nr -> nr.nodeId() == 2).findFirst().get(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, nodeRef, KafkaMetadataConfigurationState.KRAFT) .withKRaft("my-cluster", "my-namespace", nodes) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") .build(); assertThat(configuration, isEquivalent("node.id=2", @@ -895,7 +947,82 @@ public void testKraftListenersBrokerAndControllerNodes() { // Broker-only node configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, nodeRef, KafkaMetadataConfigurationState.KRAFT) .withKRaft("my-cluster", "my-namespace", nodes) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .build(); + + assertThat(configuration, isEquivalent("node.id=11", + "process.roles=broker", + "controller.listener.names=CONTROLPLANE-9090", + "controller.quorum.voters=0@my-cluster-controllers-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090", + "listener.name.controlplane-9090.ssl.client.auth=required", + "listener.name.controlplane-9090.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.controlplane-9090.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.keystore.type=PKCS12", + "listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.truststore.type=PKCS12", + "listener.name.replication-9091.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.replication-9091.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.replication-9091.ssl.keystore.type=PKCS12", + "listener.name.replication-9091.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.replication-9091.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.replication-9091.ssl.truststore.type=PKCS12", + "listener.name.replication-9091.ssl.client.auth=required", + "listeners=REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092", + "advertised.listeners=REPLICATION-9091://my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc:9091,PLAIN-9092://my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc:9092", + "listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,PLAIN-9092:PLAINTEXT", + "inter.broker.listener.name=REPLICATION-9091", + "sasl.enabled.mechanisms=", + "ssl.endpoint.identification.algorithm=HTTPS")); + } + + @ParallelTest + public void testKraftListenersBrokerAndControllerNodesWithVersion3_9() { + Set nodes = Set.of( + new NodeRef("my-cluster-controllers-0", 0, "controllers", true, false), + new NodeRef("my-cluster-controllers-1", 1, "controllers", true, false), + new NodeRef("my-cluster-controllers-2", 2, "controllers", true, false), + new NodeRef("my-cluster-brokers-10", 10, "brokers", false, true), + new NodeRef("my-cluster-brokers-11", 11, "brokers", false, true), + new NodeRef("my-cluster-brokers-12", 12, "brokers", false, true) + ); + + GenericKafkaListener listener = new GenericKafkaListenerBuilder() + .withName("plain") + .withPort(9092) + .withType(KafkaListenerType.INTERNAL) + .withTls(false) + .build(); + + // Controller-only node + NodeRef nodeRef = nodes.stream().filter(nr -> nr.nodeId() == 2).findFirst().get(); + String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, nodeRef, KafkaMetadataConfigurationState.KRAFT) + .withKRaft("my-cluster", "my-namespace", nodes) + .withListeners("my-cluster", KAFKA_3_9_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .build(); + + assertThat(configuration, isEquivalent("node.id=2", + "process.roles=controller", + "advertised.listeners=CONTROLPLANE-9090://my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc:9090", + "controller.listener.names=CONTROLPLANE-9090", + "controller.quorum.voters=0@my-cluster-controllers-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090", + "listener.name.controlplane-9090.ssl.client.auth=required", + "listener.name.controlplane-9090.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.controlplane-9090.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.keystore.type=PKCS12", + "listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.truststore.type=PKCS12", + "listeners=CONTROLPLANE-9090://0.0.0.0:9090", + "listener.security.protocol.map=CONTROLPLANE-9090:SSL", + "sasl.enabled.mechanisms=", + "ssl.endpoint.identification.algorithm=HTTPS")); + + nodeRef = nodes.stream().filter(nr -> nr.nodeId() == 11).findFirst().get(); + // Broker-only node + configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, nodeRef, KafkaMetadataConfigurationState.KRAFT) + .withKRaft("my-cluster", "my-namespace", nodes) + .withListeners("my-cluster", KAFKA_3_9_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") .build(); assertThat(configuration, isEquivalent("node.id=11", @@ -965,7 +1092,7 @@ public void testKraftOauthBrokerControllerAndMixedNodes() { String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, nodes.stream().filter(nodeRef -> nodeRef.nodeId() == 2).findFirst().get(), KafkaMetadataConfigurationState.KRAFT) .withKRaft("my-cluster", "my-namespace", nodes) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") .build(); assertThat(configuration, isEquivalent("node.id=2", @@ -989,7 +1116,7 @@ public void testKraftOauthBrokerControllerAndMixedNodes() { configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, nodes.stream().filter(nodeRef -> nodeRef.nodeId() == 11).findFirst().get(), KafkaMetadataConfigurationState.KRAFT) .withKRaft("my-cluster", "my-namespace", nodes) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") .build(); assertThat(configuration, isEquivalent("node.id=11", @@ -1028,7 +1155,7 @@ public void testKraftOauthBrokerControllerAndMixedNodes() { configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, nodes.stream().filter(nodeRef -> nodeRef.nodeId() == 14).findFirst().get(), KafkaMetadataConfigurationState.KRAFT) .withKRaft("my-cluster", "my-namespace", nodes) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") .build(); assertThat(configuration, isEquivalent("node.id=14", @@ -1064,6 +1191,147 @@ public void testKraftOauthBrokerControllerAndMixedNodes() { "listener.name.plain-9092.connections.max.reauth.ms=3600000")); } + @ParallelTest + public void testKraftOauthBrokerControllerAndMixedNodesWithVersion3_9() { + Set nodes = Set.of( + new NodeRef("my-cluster-controllers-0", 0, "controllers", true, false), + new NodeRef("my-cluster-controllers-1", 1, "controllers", true, false), + new NodeRef("my-cluster-controllers-2", 2, "controllers", true, false), + new NodeRef("my-cluster-brokers-10", 10, "brokers", false, true), + new NodeRef("my-cluster-brokers-11", 11, "brokers", false, true), + new NodeRef("my-cluster-brokers-12", 12, "brokers", false, true), + new NodeRef("my-cluster-kafka-13", 13, "kafka", true, true), + new NodeRef("my-cluster-kafka-14", 14, "kafka", true, true), + new NodeRef("my-cluster-kafka-15", 15, "kafka", true, true) + ); + + GenericKafkaListener listener = new GenericKafkaListenerBuilder() + .withName("plain") + .withPort(9092) + .withType(KafkaListenerType.INTERNAL) + .withTls(false) + .withNewKafkaListenerAuthenticationOAuth() + .withValidIssuerUri("http://valid-issuer") + .withJwksEndpointUri("http://jwks") + .withEnableECDSA(true) + .withUserNameClaim("preferred_username") + .withGroupsClaim("$.groups") + .withGroupsClaimDelimiter(";") + .withMaxSecondsWithoutReauthentication(3600) + .withJwksMinRefreshPauseSeconds(5) + .withEnablePlain(true) + .withTokenEndpointUri("http://token") + .withConnectTimeoutSeconds(30) + .withReadTimeoutSeconds(30) + .withEnableMetrics(true) + .withIncludeAcceptHeader(false) + .endKafkaListenerAuthenticationOAuth() + .build(); + + // Controller-only node + String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, + nodes.stream().filter(nodeRef -> nodeRef.nodeId() == 2).findFirst().get(), KafkaMetadataConfigurationState.KRAFT) + .withKRaft("my-cluster", "my-namespace", nodes) + .withListeners("my-cluster", KAFKA_3_9_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .build(); + + assertThat(configuration, isEquivalent("node.id=2", + "process.roles=controller", + "advertised.listeners=CONTROLPLANE-9090://my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc:9090", + "controller.listener.names=CONTROLPLANE-9090", + "controller.quorum.voters=0@my-cluster-controllers-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,13@my-cluster-kafka-13.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,14@my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,15@my-cluster-kafka-15.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090", + "listener.name.controlplane-9090.ssl.client.auth=required", + "listener.name.controlplane-9090.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.controlplane-9090.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.keystore.type=PKCS12", + "listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.truststore.type=PKCS12", + "listeners=CONTROLPLANE-9090://0.0.0.0:9090", + "listener.security.protocol.map=CONTROLPLANE-9090:SSL", + "sasl.enabled.mechanisms=", + "ssl.endpoint.identification.algorithm=HTTPS", + "principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder")); + + // Broker-only node + configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, + nodes.stream().filter(nodeRef -> nodeRef.nodeId() == 11).findFirst().get(), KafkaMetadataConfigurationState.KRAFT) + .withKRaft("my-cluster", "my-namespace", nodes) + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .build(); + + assertThat(configuration, isEquivalent("node.id=11", + "process.roles=broker", + "controller.listener.names=CONTROLPLANE-9090", + "controller.quorum.voters=0@my-cluster-controllers-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,13@my-cluster-kafka-13.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,14@my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,15@my-cluster-kafka-15.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090", + "listener.name.controlplane-9090.ssl.client.auth=required", + "listener.name.controlplane-9090.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.controlplane-9090.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.keystore.type=PKCS12", + "listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.truststore.type=PKCS12", + "listener.name.replication-9091.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.replication-9091.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.replication-9091.ssl.keystore.type=PKCS12", + "listener.name.replication-9091.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.replication-9091.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.replication-9091.ssl.truststore.type=PKCS12", + "listener.name.replication-9091.ssl.client.auth=required", + "listeners=REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092", + "advertised.listeners=REPLICATION-9091://my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc:9091,PLAIN-9092://my-cluster-brokers-11.my-cluster-kafka-brokers.my-namespace.svc:9092", + "listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT", + "inter.broker.listener.name=REPLICATION-9091", + "sasl.enabled.mechanisms=", + "ssl.endpoint.identification.algorithm=HTTPS", + "principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder", + "listener.name.plain-9092.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler", + "listener.name.plain-9092.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"thePrincipalName\" oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\";", + "listener.name.plain-9092.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler", + "listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";", + "listener.name.plain-9092.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN", + "listener.name.plain-9092.connections.max.reauth.ms=3600000")); + + // Mixed node + configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, + nodes.stream().filter(nodeRef -> nodeRef.nodeId() == 14).findFirst().get(), KafkaMetadataConfigurationState.KRAFT) + .withKRaft("my-cluster", "my-namespace", nodes) + .withListeners("my-cluster", KAFKA_3_9_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .build(); + + assertThat(configuration, isEquivalent("node.id=14", + "process.roles=broker,controller", + "controller.listener.names=CONTROLPLANE-9090", + "controller.quorum.voters=0@my-cluster-controllers-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,13@my-cluster-kafka-13.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,14@my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,15@my-cluster-kafka-15.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090", + "listener.name.controlplane-9090.ssl.client.auth=required", + "listener.name.controlplane-9090.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.controlplane-9090.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.keystore.type=PKCS12", + "listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.controlplane-9090.ssl.truststore.type=PKCS12", + "listener.name.replication-9091.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12", + "listener.name.replication-9091.ssl.keystore.password=${CERTS_STORE_PASSWORD}", + "listener.name.replication-9091.ssl.keystore.type=PKCS12", + "listener.name.replication-9091.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", + "listener.name.replication-9091.ssl.truststore.password=${CERTS_STORE_PASSWORD}", + "listener.name.replication-9091.ssl.truststore.type=PKCS12", + "listener.name.replication-9091.ssl.client.auth=required", + "listeners=CONTROLPLANE-9090://0.0.0.0:9090,REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092", + "advertised.listeners=CONTROLPLANE-9090://my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc:9090,REPLICATION-9091://my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc:9091,PLAIN-9092://my-cluster-kafka-14.my-cluster-kafka-brokers.my-namespace.svc:9092", + "listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,PLAIN-9092:SASL_PLAINTEXT", + "inter.broker.listener.name=REPLICATION-9091", + "sasl.enabled.mechanisms=", + "ssl.endpoint.identification.algorithm=HTTPS", + "principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder", + "listener.name.plain-9092.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler", + "listener.name.plain-9092.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"thePrincipalName\" oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\";", + "listener.name.plain-9092.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler", + "listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";", + "listener.name.plain-9092.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN", + "listener.name.plain-9092.connections.max.reauth.ms=3600000")); + } + @ParallelTest public void testWithPlainListenersWithSaslAuth() { GenericKafkaListener listener = new GenericKafkaListenerBuilder() @@ -1076,7 +1344,7 @@ public void testWithPlainListenersWithSaslAuth() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1116,7 +1384,7 @@ public void testWithTlsListenersWithoutAuth() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1159,7 +1427,7 @@ public void testWithTlsListenersWithTlsAuth() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1211,7 +1479,7 @@ public void testWithTlsListenersWithCustomCerts() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1252,7 +1520,7 @@ public void testWithExternalRouteListenersWithoutAuth() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1295,7 +1563,7 @@ public void testWithExternalRouteListenersWithTlsAuth() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1342,7 +1610,7 @@ public void testWithExternalRouteListenersWithSaslAuth() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1392,7 +1660,7 @@ public void testWithExternalRouteListenersWithCustomCerts() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1433,7 +1701,7 @@ public void testWithExternalListenersLoadBalancerWithTls() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1474,7 +1742,7 @@ public void testPerBrokerWithExternalListeners() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-lb.com", listenerId -> "9094") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-lb.com", listenerId -> "9094") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1515,7 +1783,7 @@ public void testWithExternalListenersLoadBalancerWithoutTls() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1553,7 +1821,7 @@ public void testWithExternalListenersNodePortWithTls() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1594,7 +1862,7 @@ public void testWithExternalListenersNodePortWithoutTls() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1632,7 +1900,7 @@ public void testPerBrokerWithExternalListenersNodePortWithoutTls() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "${STRIMZI_NODEPORT_DEFAULT_ADDRESS}", listenerId -> "31234") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "${STRIMZI_NODEPORT_DEFAULT_ADDRESS}", listenerId -> "31234") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1682,7 +1950,7 @@ public void testWithExternalListenersIngress() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1732,7 +2000,7 @@ public void testWithExternalListenersClusterIPWithTLS() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1782,7 +2050,7 @@ public void testWithExternalListenersClusterIPWithoutTLS() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1840,7 +2108,7 @@ public void testOauthConfiguration() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1899,7 +2167,7 @@ public void testOauthConfigurationWithPlainOnly() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1944,7 +2212,7 @@ public void testOauthConfigurationWithoutOptions() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -1999,7 +2267,7 @@ public void testOauthConfigurationWithTlsConfig() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -2052,7 +2320,7 @@ public void testOauthConfigurationWithClientSecret() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -2175,7 +2443,7 @@ public void testCustomAuthConfigSetProtocolMapCorrectlyForsSslSasl() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, containsString("listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,CUSTOM-LISTENER-9092:SASL_SSL")); @@ -2195,7 +2463,7 @@ public void testCustomAuthConfigSetProtocolMapCorrectlyForPlainSasl() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, containsString("listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,CUSTOM-LISTENER-9092:SASL_PLAINTEXT")); @@ -2216,7 +2484,7 @@ public void testCustomAuthConfigSetProtocolMapCorrectlyForPlain() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, containsString("listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,CUSTOM-LISTENER-9092:PLAINTEXT")); @@ -2236,7 +2504,7 @@ public void testCustomAuthConfigRemovesForbiddenPrefixes() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, not(containsString("ssl.truststore.path"))); @@ -2261,7 +2529,7 @@ public void testCustomAuthConfigPrefixesUserProvidedConfig() { .build(); String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF, KafkaMetadataConfigurationState.ZK) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "dummy-advertised-address", listenerId -> "1919") .build(); assertThat(configuration, isEquivalent("broker.id=2", @@ -2393,7 +2661,7 @@ public void testListenersOnMigration() { if (state.isPreMigrationToKRaft()) { String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, controller, state) .withKRaft("my-cluster", "my-namespace", nodes) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") .build(); // replication listener configured up to post-migration, before being full KRaft @@ -2415,7 +2683,7 @@ public void testListenersOnMigration() { String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, broker, state) .withKRaft("my-cluster", "my-namespace", nodes) - .withListeners("my-cluster", "my-namespace", singletonList(listener), listenerId -> "my-cluster-brokers-0.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") + .withListeners("my-cluster", KAFKA_3_8_0, "my-namespace", singletonList(listener), listenerId -> "my-cluster-brokers-0.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092") .build(); if (state.isZooKeeperToMigration()) {