Skip to content

Commit

Permalink
Allow configuring advertised.listeners for controllers with Kafka ver…
Browse files Browse the repository at this point in the history
…sion 3.9.0 (#10530)

Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge authored Sep 9, 2024
1 parent 9d1ddf2 commit 3476068
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
public class KafkaBrokerConfigurationBuilder {
private final static String CONTROL_PLANE_LISTENER_NAME = "CONTROLPLANE-9090";
private final static String REPLICATION_LISTENER_NAME = "REPLICATION-9091";

// Names of environment variables placeholders replaced only in the running container
private final static String PLACEHOLDER_CERT_STORE_PASSWORD = "${CERTS_STORE_PASSWORD}";
private final static String PLACEHOLDER_RACK_ID = "${STRIMZI_RACK_ID}";
Expand Down Expand Up @@ -254,18 +253,19 @@ public KafkaBrokerConfigurationBuilder withKRaft(String clusterName, String name
* generate the per-broker configuration which uses actual broker IDs and addresses instead of just placeholders.
*
* @param clusterName Name of the cluster (important for the advertised hostnames)
* @param kafkaVersion Kafka version of the cluster
* @param namespace Namespace (important for generating the advertised hostname)
* @param kafkaListeners The listeners configuration from the Kafka CR
* @param advertisedHostnameProvider Lambda method which provides the advertised hostname for given listener and
* broker. This is used to configure the user-configurable listeners.
* @param advertisedPortProvider Lambda method which provides the advertised port for given listener and broker.
* This is used to configure the user-configurable listeners.
*
* @return Returns the builder instance
*/
@SuppressWarnings({"checkstyle:CyclomaticComplexity"})
public KafkaBrokerConfigurationBuilder withListeners(
String clusterName,
KafkaVersion kafkaVersion,
String namespace,
List<GenericKafkaListener> kafkaListeners,
Function<String, String> advertisedHostnameProvider,
Expand All @@ -281,8 +281,9 @@ public KafkaBrokerConfigurationBuilder withListeners(
if (node.controller() || (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration())) {
listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090");

// Control Plane listener to be advertised only with broker in ZooKeeper-based or migration but NOT when full KRaft only or mixed
if (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration()) {
// Control Plane listener to be advertised with broker in ZooKeeper-based or migration
// Kafka version 3.9.0 requires advertised.listeners configuration for controllers, however the previous versions forbids the configuration for controllers.
if ((node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration()) || (KafkaVersion.compareDottedVersions(kafkaVersion.version(), "3.9.0") >= 0)) {
advertisedListeners.add(String.format("%s://%s:9090",
CONTROL_PLANE_LISTENER_NAME,
// Pod name constructed to be templatable for each individual ordinal
Expand Down Expand Up @@ -350,13 +351,18 @@ public KafkaBrokerConfigurationBuilder withListeners(
writer.println("listener.security.protocol.map=" + String.join(",", securityProtocol));
writer.println("listeners=" + String.join(",", listeners));

// Advertised listeners are not allowed on KRaft nodes with controller only role
if (!isKraftControllerOnly) {
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
} else if (node.controller() && kafkaMetadataConfigState.isZooKeeperToPostMigration()) {
// needed for KRaft controller only as well until post-migration because it needs to contact brokers
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
} else if (node.controller()) {
if (advertisedListeners.size() > 0) {
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
}

if (kafkaMetadataConfigState.isZooKeeperToPostMigration()) {
// needed for KRaft controller only as well until post-migration because it needs to contact brokers
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
}
}

// Control plane listener is on all ZooKeeper based brokers, needed during migration as well, when broker still using ZooKeeper but KRaft controllers are ready
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,7 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
.withRackId(rack)
.withLogDirs(VolumeUtils.createVolumeMounts(pool.storage, false))
.withListeners(cluster,
kafkaVersion,
namespace,
listeners,
listenerId -> advertisedHostnames.get(node.nodeId()).get(listenerId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class KafkaVersionTestUtils {
public static final String LATEST_ZOOKEEPER_VERSION = "3.8.4";
public static final String LATEST_CHECKSUM = "ABCD1234";
public static final String LATEST_THIRD_PARTY_VERSION = "3.8.x";
public static final String KAFKA_390_VERSION = "3.9.0";
public static final String LATEST_KAFKA_IMAGE = KAFKA_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_CONNECT_IMAGE = KAFKA_CONNECT_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_MIRROR_MAKER_IMAGE = KAFKA_MIRROR_MAKER_IMAGE_STR + LATEST_KAFKA_VERSION;
Expand Down
Loading

0 comments on commit 3476068

Please sign in to comment.