Skip to content

Commit

Permalink
Clean KafkaBrokerConfigurationBuilder class after ZooKeeper removal (
Browse files Browse the repository at this point in the history
…#10998)

Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored Dec 30, 2024
1 parent abed0e5 commit d91e404
Showing 1 changed file with 47 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public KafkaBrokerConfigurationBuilder(Reconciliation reconciliation, NodeRef no
}

/**
* Renders the broker.id or node.id configurations
* Renders the node ID configurations
*/
private void configureNodeOrBrokerId() {
printSectionHeader("Node / Broker ID");
printSectionHeader("Node ID");
writer.println("node.id=" + node.nodeId());
writer.println();
}
Expand Down Expand Up @@ -204,7 +204,6 @@ public KafkaBrokerConfigurationBuilder withKRaft(String clusterName, String name
* This is used to configure the user-configurable listeners.
* @return Returns the builder instance
*/
@SuppressWarnings({"checkstyle:CyclomaticComplexity"})
public KafkaBrokerConfigurationBuilder withListeners(
String clusterName,
KafkaVersion kafkaVersion,
Expand All @@ -217,9 +216,19 @@ public KafkaBrokerConfigurationBuilder withListeners(
List<String> advertisedListeners = new ArrayList<>();
List<String> securityProtocol = new ArrayList<>();

boolean isKraftControllerOnly = node.controller() && !node.broker();
////////////////////
// Listeners that are on all nodes
////////////////////

// Control plane listener is configured for all nodes. Even brokers need to connect and talk to controllers, so
// they need to know what is the security protocol and security configuration
securityProtocol.add(CONTROL_PLANE_LISTENER_NAME + ":SSL");
configureControlPlaneListener();

////////////////////
// Listeners for nodes with controller role
////////////////////

// Control Plane listener is set for pure KRaft controller or combined node
if (node.controller()) {
listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090");

Expand All @@ -233,33 +242,22 @@ public KafkaBrokerConfigurationBuilder withListeners(
}
}

// Security protocol and Control Plane Listener are configured everywhere

// Brokers need to know how to connect to the controllers on the Control Plane listener and what security (encryption/authentication) they should use.
// For that reason, we have to configure the Control Plane listener in the broker-only configuration as well,
// even though they do not listen at the Control Plane listener port.
// The brokers use this configuration to detect how to connect to the controllers, what certificates to use etc.
securityProtocol.add(CONTROL_PLANE_LISTENER_NAME + ":SSL");
// Control Plane listener is configured on KRaft broker only nodes as well for allowing TLS certificates keystore generation
// so that brokers are able to connect to controllers as TLS clients
configureControlPlaneListener();
////////////////////
// Listeners for nodes with broker role
////////////////////

// Replication Listener to be configured on brokers
if (node.broker()) {
// Replication Listener to be configured only on brokers
securityProtocol.add(REPLICATION_LISTENER_NAME + ":SSL");
configureReplicationListener();
}

// Non-controller listeners are used only on brokers (including mixed nodes)
if (!isKraftControllerOnly) {
// Replication listener
listeners.add(REPLICATION_LISTENER_NAME + "://0.0.0.0:9091");
advertisedListeners.add(String.format("%s://%s:9091",
REPLICATION_LISTENER_NAME,
// Pod name constructed to be templatable for each individual ordinal
DnsNameGenerator.podDnsNameWithoutClusterDomain(namespace, KafkaResources.brokersServiceName(clusterName), node.podName())
));
configureReplicationListener();

// User-configured listeners
for (GenericKafkaListener listener : kafkaListeners) {
int port = listener.getPort();
String listenerName = ListenersUtils.identifier(listener).toUpperCase(Locale.ENGLISH);
Expand All @@ -285,20 +283,25 @@ public KafkaBrokerConfigurationBuilder withListeners(
}
}

////////////////////
// Shared configurations with values dependent on all listeners
////////////////////

// configure OAuth principal builder for all the nodes - brokers, controllers, and mixed
configureOAuthPrincipalBuilderIfNeeded(writer, kafkaListeners);

printSectionHeader("Common listener configuration");
writer.println("listener.security.protocol.map=" + String.join(",", securityProtocol));
writer.println("listeners=" + String.join(",", listeners));

if (!isKraftControllerOnly) {
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
if (node.broker()) {
// Inter-broker listener is configured only for nodes with broker role
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
} else if (node.controller()) {
if (advertisedListeners.size() > 0) {
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
}
}

if (!advertisedListeners.isEmpty()) {
// Advertised listeners might be empty for controller-only nodes with Kafka versions older than 3.9.0
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
}

writer.println("sasl.enabled.mechanisms=");
Expand All @@ -323,21 +326,17 @@ private void configureOAuthPrincipalBuilderIfNeeded(PrintWriter writer, List<Gen
* rather static, it always uses TLS with TLS client auth.
*/
private void configureControlPlaneListener() {
final String controlPlaneListenerName = CONTROL_PLANE_LISTENER_NAME.toLowerCase(Locale.ENGLISH);

printSectionHeader("Control Plane listener");
configureListener(controlPlaneListenerName);
configureListener(CONTROL_PLANE_LISTENER_NAME.toLowerCase(Locale.ENGLISH));
}

/**
* Internal method which configures the replication listener. The replication listener configuration is currently
* rather static, it always uses TLS with TLS client auth.
*/
private void configureReplicationListener() {
final String replicationListenerName = REPLICATION_LISTENER_NAME.toLowerCase(Locale.ENGLISH);

printSectionHeader("Replication listener");
configureListener(replicationListenerName);
configureListener(REPLICATION_LISTENER_NAME.toLowerCase(Locale.ENGLISH));
}

/**
Expand Down Expand Up @@ -365,7 +364,7 @@ private void configureListener(String listenerName) {
*/
private void configureListener(String listenerName, GenericKafkaListenerConfiguration configuration) {
if (configuration != null) {
String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);
final String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);

if (configuration.getMaxConnections() != null) {
writer.println(String.format("listener.name.%s.max.connections=%d", listenerNameInProperty, configuration.getMaxConnections()));
Expand All @@ -384,7 +383,7 @@ private void configureListener(String listenerName, GenericKafkaListenerConfigur
* @param serverCertificate The custom certificate configuration (null if not specified by the user in the Kafka CR)
*/
private void configureTls(String listenerName, CertAndKeySecretSource serverCertificate) {
String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);
final String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);

if (serverCertificate != null) {
writer.println(String.format("listener.name.%s.ssl.keystore.location=/tmp/kafka/custom-%s.keystore.p12", listenerNameInProperty, listenerNameInProperty));
Expand All @@ -408,8 +407,8 @@ private void configureTls(String listenerName, CertAndKeySecretSource serverCert
* @param auth The authentication configuration from the Kafka CR
*/
private void configureAuthentication(String listenerName, List<String> securityProtocol, boolean tls, KafkaListenerAuthentication auth) {
String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);
String listenerNameInEnvVar = listenerName.replace("-", "_");
final String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);
final String listenerNameInEnvVar = listenerName.replace("-", "_");

if (auth instanceof KafkaListenerAuthenticationOAuth oauth) {
securityProtocol.add(String.format("%s:%s", listenerName, getSecurityProtocol(tls, true)));
Expand All @@ -421,7 +420,7 @@ private void configureAuthentication(String listenerName, List<String> securityP
addOptionIfNotNull(jaasOptions, "oauth.client.secret", String.format(PLACEHOLDER_OAUTH_CLIENT_SECRET, listenerNameInEnvVar));
}

if (oauth.getTlsTrustedCertificates() != null && oauth.getTlsTrustedCertificates().size() > 0) {
if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) {
addOptionIfNotNull(jaasOptions, "oauth.ssl.truststore.location", String.format("/tmp/kafka/oauth-%s.truststore.p12", listenerNameInProperty));
addOptionIfNotNull(jaasOptions, "oauth.ssl.truststore.password", PLACEHOLDER_CERT_STORE_PASSWORD);
addOptionIfNotNull(jaasOptions, "oauth.ssl.truststore.type", "PKCS12");
Expand All @@ -443,7 +442,7 @@ private void configureAuthentication(String listenerName, List<String> securityP
writer.println(String.format("listener.name.%s.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler", listenerNameInProperty));
writer.println(String.format("listener.name.%s.plain.sasl.jaas.config=%s", listenerNameInProperty,
AuthenticationUtils.jaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule", jaasOptions)));
if (enabledMechanisms.length() > 0) {
if (!enabledMechanisms.isEmpty()) {
enabledMechanisms.append(",");
}
enabledMechanisms.append("PLAIN");
Expand Down Expand Up @@ -474,7 +473,7 @@ private void configureAuthentication(String listenerName, List<String> securityP
securityProtocol.add(String.format("%s:%s", listenerName, getSecurityProtocol(tls, customAuth.isSasl())));
Map<String, Object> listenerConfig = customAuth.getListenerConfig();
if (listenerConfig == null) {
listenerConfig = new HashMap<String, Object>();
listenerConfig = new HashMap<>();
}
KafkaListenerCustomAuthConfiguration config = new KafkaListenerCustomAuthConfiguration(reconciliation, listenerConfig.entrySet());
config.asOrderedProperties().asMap().forEach((key, value) -> writer.println(String.format("listener.name.%s.%s=%s", listenerNameInProperty, key, value)));
Expand Down Expand Up @@ -603,7 +602,7 @@ public KafkaBrokerConfigurationBuilder withAuthorization(String clusterName, Kaf
if (authorization != null) {
List<String> superUsers = new ArrayList<>();

// Broker super users
// Broker superusers
superUsers.add(String.format("User:CN=%s,O=io.strimzi", KafkaResources.kafkaComponentName(clusterName)));
superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "entity-topic-operator"));
superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "entity-user-operator"));
Expand Down Expand Up @@ -650,7 +649,7 @@ private void configureSimpleAuthorization(KafkaAuthorizationSimple authorization
writer.println("authorizer.class.name=" + KafkaAuthorizationSimple.KRAFT_AUTHORIZER_CLASS_NAME);

// User configured super-users
if (authorization.getSuperUsers() != null && authorization.getSuperUsers().size() > 0) {
if (authorization.getSuperUsers() != null && !authorization.getSuperUsers().isEmpty()) {
superUsers.addAll(authorization.getSuperUsers().stream().map(e -> String.format("User:%s", e)).toList());
}
}
Expand All @@ -671,14 +670,14 @@ private void configureOpaAuthorization(KafkaAuthorizationOpa authorization, List
writer.println(String.format("%s=%d", "opa.authorizer.cache.maximum.size", authorization.getMaximumCacheSize()));
writer.println(String.format("%s=%d", "opa.authorizer.cache.expire.after.seconds", Duration.ofMillis(authorization.getExpireAfterMs()).getSeconds()));

if (authorization.getTlsTrustedCertificates() != null && authorization.getTlsTrustedCertificates().size() > 0) {
if (authorization.getTlsTrustedCertificates() != null && !authorization.getTlsTrustedCertificates().isEmpty()) {
writer.println("opa.authorizer.truststore.path=/tmp/kafka/authz-opa.truststore.p12");
writer.println("opa.authorizer.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD);
writer.println("opa.authorizer.truststore.type=PKCS12");
}

// User configured super-users
if (authorization.getSuperUsers() != null && authorization.getSuperUsers().size() > 0) {
if (authorization.getSuperUsers() != null && !authorization.getSuperUsers().isEmpty()) {
superUsers.addAll(authorization.getSuperUsers().stream().map(e -> String.format("User:%s", e)).toList());
}
}
Expand Down Expand Up @@ -714,7 +713,7 @@ private void configureKeycloakAuthorization(String clusterName, KafkaAuthorizati

writer.println("strimzi.authorization.kafka.cluster.name=" + clusterName);

if (authorization.getTlsTrustedCertificates() != null && authorization.getTlsTrustedCertificates().size() > 0) {
if (authorization.getTlsTrustedCertificates() != null && !authorization.getTlsTrustedCertificates().isEmpty()) {
writer.println("strimzi.authorization.ssl.truststore.location=/tmp/kafka/authz-keycloak.truststore.p12");
writer.println("strimzi.authorization.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD);
writer.println("strimzi.authorization.ssl.truststore.type=PKCS12");
Expand All @@ -723,7 +722,7 @@ private void configureKeycloakAuthorization(String clusterName, KafkaAuthorizati
}

// User configured super-users
if (authorization.getSuperUsers() != null && authorization.getSuperUsers().size() > 0) {
if (authorization.getSuperUsers() != null && !authorization.getSuperUsers().isEmpty()) {
superUsers.addAll(authorization.getSuperUsers().stream().map(e -> String.format("User:%s", e)).toList());
}
}
Expand All @@ -738,7 +737,7 @@ private void configureCustomAuthorization(KafkaAuthorizationCustom authorization
writer.println("authorizer.class.name=" + authorization.getAuthorizerClass());

// User configured super-users
if (authorization.getSuperUsers() != null && authorization.getSuperUsers().size() > 0) {
if (authorization.getSuperUsers() != null && !authorization.getSuperUsers().isEmpty()) {
superUsers.addAll(authorization.getSuperUsers().stream().map(e -> String.format("User:%s", e)).toList());
}
}
Expand Down

0 comments on commit d91e404

Please sign in to comment.