Skip to content

Commit

Permalink
Fixed scholzj and tinaselenge feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Patierno <[email protected]>
  • Loading branch information
ppatierno committed Jan 20, 2025
1 parent eacdd66 commit 8915e67
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,8 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));
JvmOptionUtils.javaOptions(varList, jvmOptions);

if (tls != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TLS, "true"));

if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates())));
}
if (tls != null && tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates())));
}

AuthenticationUtils.configureClientAuthenticationEnvVars(authentication, varList, name -> ENV_VAR_PREFIX + name);
Expand Down Expand Up @@ -567,7 +563,7 @@ public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) {
// add the ConfigMap data entry for the bridge HTTP and Kafka clients related configuration
data.put(
BRIDGE_CONFIGURATION_FILENAME,
new KafkaBridgeConfigurationBuilder(cluster, bootstrapServers)
new KafkaBridgeConfigurationBuilder(reconciliation, cluster, bootstrapServers)
.withConfigProviders()
.withTracing(tracing)
.withTls(tls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha512;
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTls;
import io.strimzi.api.kafka.model.common.tracing.Tracing;
import io.strimzi.operator.common.Reconciliation;

import java.io.PrintWriter;
import java.io.StringWriter;
Expand All @@ -30,17 +31,18 @@
public class KafkaBridgeConfigurationBuilder {

// placeholders expanded through config providers inside the bridge node
private static final String PLACEHOLDER_CERT_STORE_PASSWORD = "${strimzienv:CERTS_STORE_PASSWORD}";
private static final String PLACEHOLDER_SASL_USERNAME = "${strimzienv:KAFKA_BRIDGE_SASL_USERNAME}";
private static final String PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:CERTS_STORE_PASSWORD}";
private static final String PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_SASL_USERNAME}";
private static final String PASSWORD_VOLUME_MOUNT = "/opt/strimzi/bridge-password/";
// the SASL password file template includes: <volume_mount>/<secret_name>/<password_file>
private static final String PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE = "${strimzidir:%s%s:%s}";
private static final String PLACEHOLDER_OAUTH_CONFIG = "${strimzienv:KAFKA_BRIDGE_OAUTH_CONFIG}";
private static final String PLACEHOLDER_OAUTH_ACCESS_TOKEN = "${strimzienv:KAFKA_BRIDGE_OAUTH_ACCESS_TOKEN}";
private static final String PLACEHOLDER_OAUTH_REFRESH_TOKEN = "${strimzienv:KAFKA_BRIDGE_OAUTH_REFRESH_TOKEN}";
private static final String PLACEHOLDER_OAUTH_CLIENT_SECRET = "${strimzienv:KAFKA_BRIDGE_OAUTH_CLIENT_SECRET}";
private static final String PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD = "${strimzienv:KAFKA_BRIDGE_OAUTH_PASSWORD_GRANT_PASSWORD}";

private static final String PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE_CONFIG_PROVIDER_DIR = "${strimzidir:%s%s:%s}";
private static final String PLACEHOLDER_OAUTH_CONFIG_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_CONFIG}";
private static final String PLACEHOLDER_OAUTH_ACCESS_TOKEN_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_ACCESS_TOKEN}";
private static final String PLACEHOLDER_OAUTH_REFRESH_TOKEN_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_REFRESH_TOKEN}";
private static final String PLACEHOLDER_OAUTH_CLIENT_SECRET_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_CLIENT_SECRET}";
private static final String PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_PASSWORD_GRANT_PASSWORD}";

private final Reconciliation reconciliation;
private final StringWriter stringWriter = new StringWriter();
private final PrintWriter writer = new PrintWriter(stringWriter);

Expand All @@ -49,10 +51,12 @@ public class KafkaBridgeConfigurationBuilder {
/**
* Bridge configuration template constructor
*
* @param reconciliation the reconciliation
* @param bridgeId the bridge ID
* @param bootstrapServers Kafka cluster bootstrap servers to connect to
*/
public KafkaBridgeConfigurationBuilder(String bridgeId, String bootstrapServers) {
public KafkaBridgeConfigurationBuilder(Reconciliation reconciliation, String bridgeId, String bootstrapServers) {
this.reconciliation = reconciliation;
printHeader();
configureBridgeId(bridgeId);
configureBootstrapServers(bootstrapServers);
Expand Down Expand Up @@ -126,8 +130,7 @@ public KafkaBridgeConfigurationBuilder withTracing(Tracing tracing) {

/**
* Adds the TLS/SSL configuration for the bridge to connect to the Kafka cluster.
* The configuration includes the trusted certificates store for TLS connection (server authentication)
* as well as the keystore configuration for mTLS (client authentication).
* The configuration includes the trusted certificates store for TLS connection (server authentication).
*
* @param tls client TLS configuration
* @return the builder instance
Expand All @@ -139,7 +142,7 @@ public KafkaBridgeConfigurationBuilder withTls(ClientTls tls) {
if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) {
printSectionHeader("TLS/SSL");
writer.println("kafka.ssl.truststore.location=/tmp/strimzi/bridge.truststore.p12");
writer.println("kafka.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD);
writer.println("kafka.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR);
writer.println("kafka.ssl.truststore.type=PKCS12");
}
}
Expand All @@ -158,7 +161,7 @@ public KafkaBridgeConfigurationBuilder withAuthentication(KafkaClientAuthenticat
// configuring mTLS (client TLS authentication, together with server authentication already set)
if (authentication instanceof KafkaClientAuthenticationTls tlsAuth && tlsAuth.getCertificateAndKey() != null) {
writer.println("kafka.ssl.keystore.location=/tmp/strimzi/bridge.keystore.p12");
writer.println("kafka.ssl.keystore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD);
writer.println("kafka.ssl.keystore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR);
writer.println("kafka.ssl.keystore.type=PKCS12");
// otherwise SASL or OAuth is going to be used for authentication
} else {
Expand All @@ -168,8 +171,8 @@ public KafkaBridgeConfigurationBuilder withAuthentication(KafkaClientAuthenticat

if (authentication instanceof KafkaClientAuthenticationPlain passwordAuth) {
saslMechanism = "PLAIN";
String passwordFilePath = String.format(PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE, PASSWORD_VOLUME_MOUNT, passwordAuth.getPasswordSecret().getSecretName(), passwordAuth.getPasswordSecret().getPassword());
jaasConfig.append("org.apache.kafka.common.security.plain.PlainLoginModule required username=" + PLACEHOLDER_SASL_USERNAME + " password=" + passwordFilePath + ";");
String passwordFilePath = String.format(PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE_CONFIG_PROVIDER_DIR, PASSWORD_VOLUME_MOUNT, passwordAuth.getPasswordSecret().getSecretName(), passwordAuth.getPasswordSecret().getPassword());
jaasConfig.append("org.apache.kafka.common.security.plain.PlainLoginModule required username=" + PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR + " password=" + passwordFilePath + ";");
} else if (authentication instanceof KafkaClientAuthenticationScram scramAuth) {

if (scramAuth.getType().equals(KafkaClientAuthenticationScramSha256.TYPE_SCRAM_SHA_256)) {
Expand All @@ -178,30 +181,30 @@ public KafkaBridgeConfigurationBuilder withAuthentication(KafkaClientAuthenticat
saslMechanism = "SCRAM-SHA-512";
}

String passwordFilePath = String.format(PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE, PASSWORD_VOLUME_MOUNT, scramAuth.getPasswordSecret().getSecretName(), scramAuth.getPasswordSecret().getPassword());
jaasConfig.append("org.apache.kafka.common.security.scram.ScramLoginModule required username=" + PLACEHOLDER_SASL_USERNAME + " password=" + passwordFilePath + ";");
String passwordFilePath = String.format(PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE_CONFIG_PROVIDER_DIR, PASSWORD_VOLUME_MOUNT, scramAuth.getPasswordSecret().getSecretName(), scramAuth.getPasswordSecret().getPassword());
jaasConfig.append("org.apache.kafka.common.security.scram.ScramLoginModule required username=" + PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR + " password=" + passwordFilePath + ";");
} else if (authentication instanceof KafkaClientAuthenticationOAuth oauth) {
saslMechanism = "OAUTHBEARER";
jaasConfig.append("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + PLACEHOLDER_OAUTH_CONFIG);
jaasConfig.append("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + PLACEHOLDER_OAUTH_CONFIG_CONFIG_PROVIDER_ENV_VAR);

if (oauth.getClientSecret() != null) {
jaasConfig.append(" oauth.client.secret=" + PLACEHOLDER_OAUTH_CLIENT_SECRET);
jaasConfig.append(" oauth.client.secret=" + PLACEHOLDER_OAUTH_CLIENT_SECRET_CONFIG_PROVIDER_ENV_VAR);
}

if (oauth.getRefreshToken() != null) {
jaasConfig.append(" oauth.refresh.token=" + PLACEHOLDER_OAUTH_REFRESH_TOKEN);
jaasConfig.append(" oauth.refresh.token=" + PLACEHOLDER_OAUTH_REFRESH_TOKEN_CONFIG_PROVIDER_ENV_VAR);
}

if (oauth.getAccessToken() != null) {
jaasConfig.append(" oauth.access.token=" + PLACEHOLDER_OAUTH_ACCESS_TOKEN);
jaasConfig.append(" oauth.access.token=" + PLACEHOLDER_OAUTH_ACCESS_TOKEN_CONFIG_PROVIDER_ENV_VAR);
}

if (oauth.getPasswordSecret() != null) {
jaasConfig.append(" oauth.password.grant.password=" + PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD);
jaasConfig.append(" oauth.password.grant.password=" + PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD_CONFIG_PROVIDER_ENV_VAR);
}

if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) {
jaasConfig.append(" oauth.ssl.truststore.location=\"/tmp/strimzi/oauth.truststore.p12\" oauth.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD + " oauth.ssl.truststore.type=\"PKCS12\"");
jaasConfig.append(" oauth.ssl.truststore.location=\"/tmp/strimzi/oauth.truststore.p12\" oauth.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR + " oauth.ssl.truststore.type=\"PKCS12\"");
}

jaasConfig.append(";");
Expand All @@ -223,8 +226,9 @@ public KafkaBridgeConfigurationBuilder withAuthentication(KafkaClientAuthenticat
*/
public KafkaBridgeConfigurationBuilder withKafkaAdminClient(KafkaBridgeAdminClientSpec kafkaBridgeAdminClient) {
if (kafkaBridgeAdminClient != null) {
KafkaBridgeAdminClientConfiguration config = new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet());
printSectionHeader("Apache Kafka AdminClient");
kafkaBridgeAdminClient.getConfig().forEach((key, value) -> writer.println("kafka.admin." + key + "=" + value));
config.asOrderedProperties().asMap().forEach((key, value) -> writer.println("kafka.admin." + key + "=" + value));
writer.println();
}
return this;
Expand All @@ -238,8 +242,9 @@ public KafkaBridgeConfigurationBuilder withKafkaAdminClient(KafkaBridgeAdminClie
*/
public KafkaBridgeConfigurationBuilder withKafkaProducer(KafkaBridgeProducerSpec kafkaBridgeProducer) {
if (kafkaBridgeProducer != null) {
KafkaBridgeProducerConfiguration config = new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet());
printSectionHeader("Apache Kafka Producer");
kafkaBridgeProducer.getConfig().forEach((key, value) -> writer.println("kafka.producer." + key + "=" + value));
config.asOrderedProperties().asMap().forEach((key, value) -> writer.println("kafka.producer." + key + "=" + value));
writer.println();
}
return this;
Expand All @@ -253,8 +258,9 @@ public KafkaBridgeConfigurationBuilder withKafkaProducer(KafkaBridgeProducerSpec
*/
public KafkaBridgeConfigurationBuilder withKafkaConsumer(KafkaBridgeConsumerSpec kafkaBridgeConsumer) {
if (kafkaBridgeConsumer != null) {
KafkaBridgeConsumerConfiguration config = new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet());
printSectionHeader("Apache Kafka Consumer");
kafkaBridgeConsumer.getConfig().forEach((key, value) -> writer.println("kafka.consumer." + key + "=" + value));
config.asOrderedProperties().asMap().forEach((key, value) -> writer.println("kafka.consumer." + key + "=" + value));
writer.println("kafka.consumer.client.rack=${strimzidir:/opt/strimzi/init:rack.id}");
writer.println();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ public void testGenerateDeploymentWithTls() {
assertThat(containers.get(0).getVolumeMounts().get(3).getMountPath(), is(KafkaBridgeCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-another-secret"));

assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS), is("my-secret/cert.crt;my-secret/new-cert.crt;my-another-secret/another-cert.crt"));
assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_TLS), is("true"));

ConfigMap configMap = kbc.generateBridgeConfigMap(metricsAndLogging);
assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), containsString("kafka.ssl.truststore."));
Expand Down Expand Up @@ -308,7 +307,6 @@ public void testGenerateDeploymentWithTlsAuth() {

assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(ENV_VAR_KAFKA_BRIDGE_TLS_AUTH_CERT), is("user-secret/user.crt"));
assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(ENV_VAR_KAFKA_BRIDGE_TLS_AUTH_KEY), is("user-secret/user.key"));
assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_TLS), is("true"));

ConfigMap configMap = kbc.generateBridgeConfigMap(metricsAndLogging);
assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), containsString("kafka.ssl.truststore."));
Expand Down
Loading

0 comments on commit 8915e67

Please sign in to comment.