From de427e5079df4d8d5b2e2264b1b2dcc9391f5355 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Tue, 14 Jan 2025 15:39:45 +0100 Subject: [PATCH 1/2] Moved bridge configuration setup within the operator Signed-off-by: Paolo Patierno Reverted back original operator Deployment Set different bridge image for STs Signed-off-by: Paolo Patierno Fixed missing env vars to setup the truststore Signed-off-by: Paolo Patierno Removed ST using env vars not in place anymore Signed-off-by: Paolo Patierno Fixed checkstyle errors Signed-off-by: Paolo Patierno CHANGELOG update Signed-off-by: Paolo Patierno Fixed updated systemtests bridge MD Signed-off-by: Paolo Patierno Fixed scholzj comments Signed-off-by: Paolo Patierno Factored out a dedicated withConfigProviders method for the bridge configuration builder Refactored bridge configuration builder to use isEquivalent Signed-off-by: Paolo Patierno Fixed tinaselenge comments Signed-off-by: Paolo Patierno Fixed checkstyle issues Signed-off-by: Paolo Patierno Fixed rolling bridge pod on configuration change Signed-off-by: Paolo Patierno Fixed scholzj and tinaselenge feedback Signed-off-by: Paolo Patierno Fixed KafkaBridgeAssemblyOperator tests about the missing new hash annotation Signed-off-by: Paolo Patierno Reverted back some deleted tests Signed-off-by: Paolo Patierno Adapted system tests to changes in configuration Signed-off-by: Paolo Patierno Fixed scholzj and fvaleri feedback Signed-off-by: Paolo Patierno --- .../templates/steps/system_test_general.yaml | 2 +- CHANGELOG.md | 3 + .../cluster/model/KafkaBridgeCluster.java | 106 ++-- .../KafkaBridgeConfigurationBuilder.java | 348 +++++++++++++ .../assembly/KafkaBridgeAssemblyOperator.java | 17 +- .../strimzi/operator/cluster/TestUtils.java | 87 ++++ .../cluster/model/KafkaBridgeClusterTest.java | 72 +-- .../KafkaBridgeConfigurationBuilderTest.java | 457 ++++++++++++++++++ .../KafkaBrokerConfigurationBuilderTest.java | 84 +--- .../model/KafkaConnectDockerfileTest.java | 2 +- .../KafkaBridgeAssemblyOperatorTest.java | 46 +- .../systemtest/bridge/HttpBridgeST.java | 22 +- 12 files changed, 1018 insertions(+), 228 deletions(-) create mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilder.java create mode 100644 cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilderTest.java diff --git a/.azure/templates/steps/system_test_general.yaml b/.azure/templates/steps/system_test_general.yaml index 70770b1c2bb..4b415cb3ab6 100644 --- a/.azure/templates/steps/system_test_general.yaml +++ b/.azure/templates/steps/system_test_general.yaml @@ -145,7 +145,7 @@ jobs: env: DOCKER_TAG: $(docker_tag) - BRIDGE_IMAGE: "latest-released" + BRIDGE_IMAGE: "quay.io/ppatierno/kafka-bridge:bridge-config" STRIMZI_RBAC_SCOPE: '${{ parameters.strimzi_rbac_scope }}' DOCKER_REGISTRY: registry.minikube CLUSTER_OPERATOR_INSTALL_TYPE: '${{ parameters.cluster_operator_install_type }}' diff --git a/CHANGELOG.md b/CHANGELOG.md index f43e80e8443..cde3d5d0b68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * Support for storage class overrides has been removed * Added support to configure `dnsPolicy` and `dnsConfig` using the `template` sections. * Store Kafka node certificates in separate Secrets, one Secret per pod. +* Moved HTTP bridge configuration to the ConfigMap setup by the operator. ### Major changes, deprecations and removals @@ -21,6 +22,8 @@ * The storage overrides for configuring per-broker storage class are not supported anymore. If you are using the storage overrides, you should instead use multiple KafkaNodePool resources with a different storage class each. For more details about migrating from storage overrides, please follow the [documentation](https://strimzi.io/docs/operators/0.45.0/full/deploying.html#con-config-storage-zookeeper-str). +* Because of the corresponding configuration changes, the 0.32.0 is the minimum HTTP bridge release working with the Strimzi operator 0.46.0. + * If you have an Apache Kafka cluster running together with an HTTP bridge instance, please upgrade the bridge after upgrading the operator. ## 0.45.0 diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java index 2db39b75041..c2b77f28288 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java @@ -48,6 +48,7 @@ import io.strimzi.operator.cluster.model.logging.SupportsLogging; import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl; import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl; +import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.Util; import io.strimzi.operator.common.model.InvalidResourceException; @@ -86,29 +87,23 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging // Kafka Bridge configuration keys (EnvVariables) protected static final String ENV_VAR_PREFIX = "KAFKA_BRIDGE_"; protected static final String ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED"; - protected static final String ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS = "KAFKA_BRIDGE_BOOTSTRAP_SERVERS"; - protected static final String ENV_VAR_KAFKA_BRIDGE_TLS = "KAFKA_BRIDGE_TLS"; protected static final String ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS = "KAFKA_BRIDGE_TRUSTED_CERTS"; protected static final String OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT = "/opt/strimzi/oauth-certs/"; - protected static final String ENV_VAR_STRIMZI_TRACING = "STRIMZI_TRACING"; - - protected static final String ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG = "KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG"; - protected static final String ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG = "KAFKA_BRIDGE_PRODUCER_CONFIG"; - protected static final String ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG = "KAFKA_BRIDGE_CONSUMER_CONFIG"; - protected static final String ENV_VAR_KAFKA_BRIDGE_ID = "KAFKA_BRIDGE_ID"; - - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_HOST = "KAFKA_BRIDGE_HTTP_HOST"; - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PORT = "KAFKA_BRIDGE_HTTP_PORT"; - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT = "KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT"; - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED = "KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED"; - protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED = "KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED"; - protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED = "KAFKA_BRIDGE_CORS_ENABLED"; - protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS = "KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS"; - protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS = "KAFKA_BRIDGE_CORS_ALLOWED_METHODS"; protected static final String CO_ENV_VAR_CUSTOM_BRIDGE_POD_LABELS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_LABELS"; protected static final String INIT_VOLUME_MOUNT = "/opt/strimzi/init"; + /** + * Key under which the bridge configuration is stored in ConfigMap + */ + public static final String BRIDGE_CONFIGURATION_FILENAME = "application.properties"; + + /** + * Annotation for rolling the bridge whenever the configuration within the application.properties file is changed. + * When the configuration hash annotation change is detected, we force a pod restart. + */ + public static final String ANNO_STRIMZI_IO_CONFIGURATION_HASH = Annotations.STRIMZI_DOMAIN + "configuration-hash"; + private int replicas; private ClientTls tls; private KafkaClientAuthentication authentication; @@ -411,59 +406,12 @@ protected List getEnvVars() { varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled))); JvmOptionUtils.javaOptions(varList, jvmOptions); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS, bootstrapServers)); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG, kafkaBridgeAdminClient == null ? "" : new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet()).getConfiguration())); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ID, cluster)); - - if (kafkaBridgeConsumer != null) { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet()).getConfiguration())); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, String.valueOf(kafkaBridgeConsumer.isEnabled()))); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(kafkaBridgeConsumer.getTimeoutSeconds()))); - } else { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, "")); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, "true")); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(KafkaBridgeConsumerSpec.HTTP_DEFAULT_TIMEOUT))); - } - - if (kafkaBridgeProducer != null) { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet()).getConfiguration())); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, String.valueOf(kafkaBridgeProducer.isEnabled()))); - } else { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, "")); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, "true")); - } - - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_HOST, KafkaBridgeHttpConfig.HTTP_DEFAULT_HOST)); - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PORT, String.valueOf(http != null ? http.getPort() : KafkaBridgeHttpConfig.HTTP_DEFAULT_PORT))); - - if (http != null && http.getCors() != null) { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "true")); - - if (http.getCors().getAllowedOrigins() != null) { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS, String.join(",", http.getCors().getAllowedOrigins()))); - } - - if (http.getCors().getAllowedMethods() != null) { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS, String.join(",", http.getCors().getAllowedMethods()))); - } - } else { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "false")); - } - - if (tls != null) { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TLS, "true")); - - if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates()))); - } + if (tls != null && tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) { + varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates()))); } AuthenticationUtils.configureClientAuthenticationEnvVars(authentication, varList, name -> ENV_VAR_PREFIX + name); - if (tracing != null) { - varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_TRACING, tracing.getType())); - } - // Add shared environment variables used for all containers varList.addAll(sharedEnvironmentProvider.variables()); @@ -600,21 +548,39 @@ protected List getInitContainerEnvVars() { } /** - * Generates a metrics and logging ConfigMap according to the configuration. If this operand doesn't support logging - * or metrics, they will nto be set. + * Generates a ConfigMap containing the bridge configuration related to HTTP and Kafka clients. + * It also generates the metrics and logging configuration. If this operand doesn't support logging + * or metrics, they will not be set. * * @param metricsAndLogging The external CMs with logging and metrics configuration * * @return The generated ConfigMap */ - public ConfigMap generateMetricsAndLogConfigMap(MetricsAndLogging metricsAndLogging) { + public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) { + // generate the ConfigMap data entries for the metrics and logging configuration + Map data = ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging); + // add the ConfigMap data entry for the bridge HTTP and Kafka clients related configuration + data.put( + BRIDGE_CONFIGURATION_FILENAME, + new KafkaBridgeConfigurationBuilder(reconciliation, cluster, bootstrapServers) + .withConfigProviders() + .withTracing(tracing) + .withTls(tls) + .withAuthentication(authentication) + .withKafkaAdminClient(kafkaBridgeAdminClient) + .withKafkaProducer(kafkaBridgeProducer) + .withKafkaConsumer(kafkaBridgeConsumer) + .withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer) + .build() + ); + return ConfigMapUtils .createConfigMap( KafkaBridgeResources.metricsAndLogConfigMapName(cluster), namespace, labels, ownerReference, - ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging) + data ); } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilder.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilder.java new file mode 100644 index 00000000000..90f4c20579f --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilder.java @@ -0,0 +1,348 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model; + +import io.strimzi.api.kafka.model.bridge.KafkaBridgeAdminClientSpec; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpec; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeHttpConfig; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeProducerSpec; +import io.strimzi.api.kafka.model.common.ClientTls; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlain; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScram; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha256; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha512; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTls; +import io.strimzi.api.kafka.model.common.tracing.Tracing; +import io.strimzi.operator.common.Reconciliation; + +import java.io.PrintWriter; +import java.io.StringWriter; + +/** + * This class is used to generate the bridge configuration template. The template is later passed using a ConfigMap to + * the bridge pod. The script in the container image will fill in the variables in the template and use the + * configuration file. This class is using the builder pattern to make it easy to test the different parts etc. To + * generate the configuration file, it is using the PrintWriter. + */ +public class KafkaBridgeConfigurationBuilder { + + // placeholders expanded through config providers inside the bridge node + private static final String PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:CERTS_STORE_PASSWORD}"; + private static final String PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_SASL_USERNAME}"; + private static final String PASSWORD_VOLUME_MOUNT = "/opt/strimzi/bridge-password/"; + // the SASL password file template includes: // + private static final String PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE_CONFIG_PROVIDER_DIR = "${strimzidir:%s%s:%s}"; + private static final String PLACEHOLDER_OAUTH_CONFIG_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_CONFIG}"; + private static final String PLACEHOLDER_OAUTH_ACCESS_TOKEN_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_ACCESS_TOKEN}"; + private static final String PLACEHOLDER_OAUTH_REFRESH_TOKEN_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_REFRESH_TOKEN}"; + private static final String PLACEHOLDER_OAUTH_CLIENT_SECRET_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_CLIENT_SECRET}"; + private static final String PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_PASSWORD_GRANT_PASSWORD}"; + + private final Reconciliation reconciliation; + private final StringWriter stringWriter = new StringWriter(); + private final PrintWriter writer = new PrintWriter(stringWriter); + + private String securityProtocol = "PLAINTEXT"; + + /** + * Bridge configuration template constructor + * + * @param reconciliation the reconciliation + * @param bridgeId the bridge ID + * @param bootstrapServers Kafka cluster bootstrap servers to connect to + */ + public KafkaBridgeConfigurationBuilder(Reconciliation reconciliation, String bridgeId, String bootstrapServers) { + this.reconciliation = reconciliation; + printHeader(); + configureBridgeId(bridgeId); + configureBootstrapServers(bootstrapServers); + } + + /** + * Renders the bridge ID configurations + * + * @param bridgeId the bridge ID + */ + private void configureBridgeId(String bridgeId) { + printSectionHeader("Bridge ID"); + writer.println("bridge.id=" + bridgeId); + writer.println(); + } + + /** + * Renders the Apache Kafka bootstrap servers configuration + * + * @param bootstrapServers Kafka cluster bootstrap servers to connect to + */ + private void configureBootstrapServers(String bootstrapServers) { + printSectionHeader("Kafka bootstrap servers"); + writer.println("kafka.bootstrap.servers=" + bootstrapServers); + writer.println(); + } + + /** + * Configure the Kafka security protocol to be used + * This internal method is used when the configuration is build, because the security protocol depends on + * TLS and SASL authentication configurations and if they are set + */ + private void configureSecurityProtocol() { + printSectionHeader("Kafka Security protocol"); + writer.println("kafka.security.protocol=" + securityProtocol); + } + + /** + * Configures the Kafka config providers used for loading some parameters from env vars and files + * (i.e. user and password for authentication) + * + * @return the builder instance + */ + public KafkaBridgeConfigurationBuilder withConfigProviders() { + printSectionHeader("Config providers"); + writer.println("kafka.config.providers=strimzienv,strimzifile,strimzidir"); + writer.println("kafka.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"); + writer.println("kafka.config.providers.strimzienv.param.allowlist.pattern=.*"); + writer.println("kafka.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider"); + writer.println("kafka.config.providers.strimzifile.param.allowed.paths=/opt/strimzi"); + writer.println("kafka.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider"); + writer.println("kafka.config.providers.strimzidir.param.allowed.paths=/opt/strimzi"); + writer.println(); + return this; + } + + /** + * Adds the tracing type + * + * @param tracing the tracing configuration + * @return the builder instance + */ + public KafkaBridgeConfigurationBuilder withTracing(Tracing tracing) { + if (tracing != null) { + printSectionHeader("Tracing configuration"); + writer.println("bridge.tracing=" + tracing.getType()); + writer.println(); + } + return this; + } + + /** + * Adds the TLS/SSL configuration for the bridge to connect to the Kafka cluster. + * The configuration includes the trusted certificates store for TLS connection (server authentication). + * + * @param tls client TLS configuration + * @return the builder instance + */ + public KafkaBridgeConfigurationBuilder withTls(ClientTls tls) { + if (tls != null) { + securityProtocol = "SSL"; + + if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) { + printSectionHeader("TLS/SSL"); + writer.println("kafka.ssl.truststore.location=/tmp/strimzi/bridge.truststore.p12"); + writer.println("kafka.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + writer.println("kafka.ssl.truststore.type=PKCS12"); + } + } + return this; + } + + /** + * Add the SASL configuration for client authentication to the Kafka cluster + * + * @param authentication authentication configuration + * @return the builder instance + */ + public KafkaBridgeConfigurationBuilder withAuthentication(KafkaClientAuthentication authentication) { + if (authentication != null) { + printSectionHeader("Authentication configuration"); + // configuring mTLS (client TLS authentication, together with server authentication already set) + if (authentication instanceof KafkaClientAuthenticationTls tlsAuth && tlsAuth.getCertificateAndKey() != null) { + writer.println("kafka.ssl.keystore.location=/tmp/strimzi/bridge.keystore.p12"); + writer.println("kafka.ssl.keystore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + writer.println("kafka.ssl.keystore.type=PKCS12"); + // otherwise SASL or OAuth is going to be used for authentication + } else { + securityProtocol = securityProtocol.equals("SSL") ? "SASL_SSL" : "SASL_PLAINTEXT"; + String saslMechanism = null; + StringBuilder jaasConfig = new StringBuilder(); + + if (authentication instanceof KafkaClientAuthenticationPlain passwordAuth) { + saslMechanism = "PLAIN"; + String passwordFilePath = String.format(PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE_CONFIG_PROVIDER_DIR, PASSWORD_VOLUME_MOUNT, passwordAuth.getPasswordSecret().getSecretName(), passwordAuth.getPasswordSecret().getPassword()); + jaasConfig.append("org.apache.kafka.common.security.plain.PlainLoginModule required username=" + PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR + " password=" + passwordFilePath + ";"); + } else if (authentication instanceof KafkaClientAuthenticationScram scramAuth) { + + if (scramAuth.getType().equals(KafkaClientAuthenticationScramSha256.TYPE_SCRAM_SHA_256)) { + saslMechanism = "SCRAM-SHA-256"; + } else if (scramAuth.getType().equals(KafkaClientAuthenticationScramSha512.TYPE_SCRAM_SHA_512)) { + saslMechanism = "SCRAM-SHA-512"; + } + + String passwordFilePath = String.format(PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE_CONFIG_PROVIDER_DIR, PASSWORD_VOLUME_MOUNT, scramAuth.getPasswordSecret().getSecretName(), scramAuth.getPasswordSecret().getPassword()); + jaasConfig.append("org.apache.kafka.common.security.scram.ScramLoginModule required username=" + PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR + " password=" + passwordFilePath + ";"); + } else if (authentication instanceof KafkaClientAuthenticationOAuth oauth) { + saslMechanism = "OAUTHBEARER"; + jaasConfig.append("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + PLACEHOLDER_OAUTH_CONFIG_CONFIG_PROVIDER_ENV_VAR); + + if (oauth.getClientSecret() != null) { + jaasConfig.append(" oauth.client.secret=" + PLACEHOLDER_OAUTH_CLIENT_SECRET_CONFIG_PROVIDER_ENV_VAR); + } + + if (oauth.getRefreshToken() != null) { + jaasConfig.append(" oauth.refresh.token=" + PLACEHOLDER_OAUTH_REFRESH_TOKEN_CONFIG_PROVIDER_ENV_VAR); + } + + if (oauth.getAccessToken() != null) { + jaasConfig.append(" oauth.access.token=" + PLACEHOLDER_OAUTH_ACCESS_TOKEN_CONFIG_PROVIDER_ENV_VAR); + } + + if (oauth.getPasswordSecret() != null) { + jaasConfig.append(" oauth.password.grant.password=" + PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD_CONFIG_PROVIDER_ENV_VAR); + } + + if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) { + jaasConfig.append(" oauth.ssl.truststore.location=\"/tmp/strimzi/oauth.truststore.p12\" oauth.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR + " oauth.ssl.truststore.type=\"PKCS12\""); + } + + jaasConfig.append(";"); + writer.println("kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"); + } + writer.println("kafka.sasl.mechanism=" + saslMechanism); + writer.println("kafka.sasl.jaas.config=" + jaasConfig); + writer.println(); + } + } + return this; + } + + /** + * Adds the bridge Kafka admin client specific configuration + * + * @param kafkaBridgeAdminClient the Kafka admin client configuration + * @return the builder instance + */ + public KafkaBridgeConfigurationBuilder withKafkaAdminClient(KafkaBridgeAdminClientSpec kafkaBridgeAdminClient) { + if (kafkaBridgeAdminClient != null) { + KafkaBridgeAdminClientConfiguration config = new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet()); + printSectionHeader("Apache Kafka AdminClient"); + config.asOrderedProperties().asMap().forEach((key, value) -> writer.println("kafka.admin." + key + "=" + value)); + writer.println(); + } + return this; + } + + /** + * Adds the bridge Kafka producer specific configuration + * + * @param kafkaBridgeProducer the Kafka producer configuration + * @return the builder instance + */ + public KafkaBridgeConfigurationBuilder withKafkaProducer(KafkaBridgeProducerSpec kafkaBridgeProducer) { + if (kafkaBridgeProducer != null) { + KafkaBridgeProducerConfiguration config = new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet()); + printSectionHeader("Apache Kafka Producer"); + config.asOrderedProperties().asMap().forEach((key, value) -> writer.println("kafka.producer." + key + "=" + value)); + writer.println(); + } + return this; + } + + /** + * Adds the bridge Kafka consumer specific configuration + * + * @param kafkaBridgeConsumer the Kafka consumer configuration + * @return the builder instance + */ + public KafkaBridgeConfigurationBuilder withKafkaConsumer(KafkaBridgeConsumerSpec kafkaBridgeConsumer) { + if (kafkaBridgeConsumer != null) { + KafkaBridgeConsumerConfiguration config = new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet()); + printSectionHeader("Apache Kafka Consumer"); + config.asOrderedProperties().asMap().forEach((key, value) -> writer.println("kafka.consumer." + key + "=" + value)); + writer.println("kafka.consumer.client.rack=${strimzidir:/opt/strimzi/init:rack.id}"); + writer.println(); + } + return this; + } + + /** + * Adds the HTTP configuration which includes HTTP specific parameters (i.e. host, port, CORS, ...) as well as + * configuration for the HTTP related part of the producer and consumer (i.e. timeout, enable status, ...) + * + * @param http the HTTP configuration + * @param kafkaBridgeProducer the Kafka producer configuration + * @param kafkaBridgeConsumer the Kafka consumer configuration + * @return the builder instance + */ + public KafkaBridgeConfigurationBuilder withHttp(KafkaBridgeHttpConfig http, KafkaBridgeProducerSpec kafkaBridgeProducer, KafkaBridgeConsumerSpec kafkaBridgeConsumer) { + printSectionHeader("HTTP configuration"); + writer.println("http.host=" + KafkaBridgeHttpConfig.HTTP_DEFAULT_HOST); + writer.println("http.port=" + (http != null ? http.getPort() : KafkaBridgeHttpConfig.HTTP_DEFAULT_PORT)); + if (http != null && http.getCors() != null) { + writer.println("http.cors.enabled=true"); + + if (http.getCors().getAllowedOrigins() != null) { + writer.println("http.cors.allowedOrigins=" + String.join(",", http.getCors().getAllowedOrigins())); + } + + if (http.getCors().getAllowedMethods() != null) { + writer.println("http.cors.allowedMethods=" + String.join(",", http.getCors().getAllowedMethods())); + } + } else { + writer.println("http.cors.enabled=false"); + } + + if (kafkaBridgeConsumer != null) { + writer.println("http.consumer.enabled=" + kafkaBridgeConsumer.isEnabled()); + writer.println("http.timeoutSeconds=" + kafkaBridgeConsumer.getTimeoutSeconds()); + } else { + writer.println("http.consumer.enabled=true"); + writer.println("http.timeoutSeconds=" + KafkaBridgeConsumerSpec.HTTP_DEFAULT_TIMEOUT); + } + + if (kafkaBridgeProducer != null) { + writer.println("http.producer.enabled=" + kafkaBridgeProducer.isEnabled()); + } else { + writer.println("http.producer.enabled=true"); + } + + return this; + } + + /** + * Prints the file header which is on the beginning of the configuration file. + */ + private void printHeader() { + writer.println("##############################"); + writer.println("##############################"); + writer.println("# This file is automatically generated by the Strimzi Cluster Operator"); + writer.println("# Any changes to this file will be ignored and overwritten!"); + writer.println("##############################"); + writer.println("##############################"); + writer.println(); + } + + /** + * Internal method which prints the section header into the configuration file. This makes it more human-readable + * when looking for issues in running pods etc. + * + * @param sectionName Name of the section for which is this header printed + */ + private void printSectionHeader(String sectionName) { + writer.println("##########"); + writer.println("# " + sectionName); + writer.println("##########"); + } + + /** + * Generates the configuration template as String + * + * @return String with the Kafka bridge configuration template + */ + public String build() { + configureSecurityProtocol(); + return stringWriter.toString(); + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java index 789826373d9..90af04b512a 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java @@ -4,6 +4,7 @@ */ package io.strimzi.operator.cluster.operator.assembly; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ServiceAccount; import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding; import io.fabric8.kubernetes.client.KubernetesClient; @@ -27,6 +28,7 @@ import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.ReconciliationException; import io.strimzi.operator.common.ReconciliationLogger; +import io.strimzi.operator.common.Util; import io.strimzi.operator.common.model.PasswordGenerator; import io.strimzi.operator.common.model.StatusUtils; import io.strimzi.operator.common.operator.resource.ReconcileResult; @@ -35,7 +37,9 @@ import io.vertx.core.Vertx; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** *

Assembly operator for a "Kafka Bridge" assembly, which manages:

@@ -75,6 +79,8 @@ protected Future createOrUpdate(Reconciliation reconciliation String namespace = reconciliation.namespace(); KafkaBridgeCluster bridge; + Map podAnnotations = new HashMap<>(); + try { bridge = KafkaBridgeCluster.fromCrd(reconciliation, assemblyResource, sharedEnvironmentProvider); } catch (Exception e) { @@ -98,10 +104,17 @@ protected Future createOrUpdate(Reconciliation reconciliation .compose(i -> deploymentOperations.scaleDown(reconciliation, namespace, bridge.getComponentName(), bridge.getReplicas(), operationTimeoutMs)) .compose(scale -> serviceOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.serviceName(bridge.getCluster()), bridge.generateService())) .compose(i -> MetricsAndLoggingUtils.metricsAndLogging(reconciliation, configMapOperations, bridge.logging(), null)) - .compose(metricsAndLogging -> configMapOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.metricsAndLogConfigMapName(reconciliation.name()), bridge.generateMetricsAndLogConfigMap(metricsAndLogging))) + .compose(metricsAndLogging -> { + ConfigMap configMap = bridge.generateBridgeConfigMap(metricsAndLogging); + podAnnotations.put(KafkaBridgeCluster.ANNO_STRIMZI_IO_CONFIGURATION_HASH, Util.hashStub(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME))); + return configMapOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.metricsAndLogConfigMapName(reconciliation.name()), configMap); + }) .compose(i -> isPodDisruptionBudgetGeneration ? podDisruptionBudgetOperator.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generatePodDisruptionBudget()) : Future.succeededFuture()) .compose(i -> VertxUtil.authTlsHash(secretOperations, namespace, auth, trustedCertificates)) - .compose(hash -> deploymentOperations.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generateDeployment(Collections.singletonMap(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(hash)), pfa.isOpenshift(), imagePullPolicy, imagePullSecrets))) + .compose(authTlsHash -> { + podAnnotations.put(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(authTlsHash)); + return deploymentOperations.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generateDeployment(podAnnotations, pfa.isOpenshift(), imagePullPolicy, imagePullSecrets)); + }) .compose(i -> deploymentOperations.scaleUp(reconciliation, namespace, bridge.getComponentName(), bridge.getReplicas(), operationTimeoutMs)) .compose(i -> deploymentOperations.waitForObserved(reconciliation, namespace, bridge.getComponentName(), 1_000, operationTimeoutMs)) .compose(i -> bridgeHasZeroReplicas ? Future.succeededFuture() : deploymentOperations.readiness(reconciliation, namespace, bridge.getComponentName(), 1_000, operationTimeoutMs)) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/TestUtils.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/TestUtils.java index 0a6700bc22c..e1b19478df4 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/TestUtils.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/TestUtils.java @@ -11,10 +11,21 @@ import io.fabric8.kubernetes.api.model.EnvVar; import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics; import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetricsBuilder; +import io.strimzi.operator.cluster.model.ModelUtils; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.TreeSet; import java.util.stream.Collectors; +import static java.util.Arrays.asList; import static java.util.Collections.singletonMap; public class TestUtils { @@ -54,4 +65,80 @@ public static Map containerEnvVars(Container container) { // On duplicates, last-in wins (u, v) -> v)); } + + /** + * Class used to compare collection of lines (current against expected) + * It can be used, for example, to compare configurations made by key=value pair (properties) + * to check that the current one is equivalent to the expected (not taking declaration order into account) + */ + public static class IsEquivalent extends TypeSafeMatcher { + private final List expectedLines; + + public IsEquivalent(String expectedConfig) { + super(); + this.expectedLines = ModelUtils.getLinesWithoutCommentsAndEmptyLines(expectedConfig); + } + + public IsEquivalent(List expectedLines) { + super(); + this.expectedLines = expectedLines; + } + + @Override + protected boolean matchesSafely(String config) { + List actualLines = ModelUtils.getLinesWithoutCommentsAndEmptyLines(config); + + return expectedLines.containsAll(actualLines) && actualLines.containsAll(expectedLines); + } + + private String getLinesAsString(Collection configLines) { + StringWriter stringWriter = new StringWriter(); + PrintWriter writer = new PrintWriter(stringWriter); + + for (String line : configLines) { + writer.println(line); + } + + return stringWriter.toString(); + } + + @Override + public void describeTo(Description description) { + description.appendText(getLinesAsString(new TreeSet<>(expectedLines))); + } + + @Override + protected void describeMismatchSafely(String item, Description mismatchDescription) { + printDiff(item, mismatchDescription); + } + + private void printDiff(String item, Description mismatchDescription) { + List actualLines = ModelUtils.getLinesWithoutCommentsAndEmptyLines(item); + List actualLinesDiff = new ArrayList<>(actualLines); + actualLinesDiff.removeAll(expectedLines); + List expectedLinesDiff = new ArrayList<>(expectedLines); + expectedLinesDiff.removeAll(actualLines); + + mismatchDescription + .appendText(" was: \n") + .appendText(getLinesAsString(new TreeSet<>(ModelUtils.getLinesWithoutCommentsAndEmptyLines(item)))) + .appendText("\n\n") + .appendText(" wrong lines in expected:\n") + .appendText(getLinesAsString(expectedLinesDiff)) + .appendText("\n\n") + .appendText(" Wrong lines in actual:\n") + .appendText(getLinesAsString(actualLinesDiff)) + .appendText("\n\nOriginal value: \n") + .appendText(item) + .appendText("\n\n"); + } + + public static Matcher isEquivalent(String expectedConfig) { + return new IsEquivalent(expectedConfig); + } + + public static Matcher isEquivalent(String... expectedLines) { + return new IsEquivalent(asList(expectedLines)); + } + } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeClusterTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeClusterTest.java index df6d6d83f75..37e3ecdd51d 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeClusterTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeClusterTest.java @@ -6,6 +6,7 @@ import io.fabric8.kubernetes.api.model.Affinity; import io.fabric8.kubernetes.api.model.AffinityBuilder; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource; import io.fabric8.kubernetes.api.model.ConfigMapVolumeSourceBuilder; import io.fabric8.kubernetes.api.model.Container; @@ -35,9 +36,7 @@ import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding; import io.strimzi.api.kafka.model.bridge.KafkaBridge; import io.strimzi.api.kafka.model.bridge.KafkaBridgeBuilder; -import io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpec; import io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpecBuilder; -import io.strimzi.api.kafka.model.bridge.KafkaBridgeHttpConfig; import io.strimzi.api.kafka.model.bridge.KafkaBridgeProducerSpecBuilder; import io.strimzi.api.kafka.model.bridge.KafkaBridgeResources; import io.strimzi.api.kafka.model.common.CertSecretSource; @@ -59,6 +58,8 @@ import io.strimzi.kafka.oauth.server.ServerConfig; import io.strimzi.operator.cluster.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.ResourceUtils; +import io.strimzi.operator.cluster.model.logging.LoggingModel; +import io.strimzi.operator.cluster.model.metrics.MetricsModel; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.InvalidResourceException; import io.strimzi.operator.common.model.Labels; @@ -115,9 +116,6 @@ public class KafkaBridgeClusterTest { private final int healthDelay = 15; private final int healthTimeout = 5; private final String bootstrapServers = "foo-kafka:9092"; - private final String defaultAdminclientConfiguration = ""; - private final String defaultProducerConfiguration = ""; - private final String defaultConsumerConfiguration = ""; private final KafkaBridge resource = new KafkaBridgeBuilder(ResourceUtils.createEmptyKafkaBridge(namespace, cluster)) .withNewSpec() @@ -130,6 +128,8 @@ public class KafkaBridgeClusterTest { .build(); private final KafkaBridgeCluster kbc = KafkaBridgeCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, SHARED_ENV_PROVIDER); + private final MetricsAndLogging metricsAndLogging = new MetricsAndLogging(null, null); + private Map expectedLabels(String name) { return TestUtils.modifiableMap(Labels.STRIMZI_CLUSTER_LABEL, this.cluster, "my-user-label", "cromulent", @@ -157,17 +157,6 @@ protected List getExpectedEnvVars() { List expected = new ArrayList<>(); expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED).withValue(String.valueOf(true)).build()); expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_STRIMZI_GC_LOG_ENABLED).withValue(String.valueOf(JvmOptions.DEFAULT_GC_LOGGING_ENABLED)).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS).withValue(bootstrapServers).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG).withValue(defaultAdminclientConfiguration).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_ID).withValue(cluster).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG).withValue(defaultConsumerConfiguration).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED).withValue(String.valueOf(true)).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT).withValue(String.valueOf(KafkaBridgeConsumerSpec.HTTP_DEFAULT_TIMEOUT)).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG).withValue(defaultProducerConfiguration).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED).withValue(String.valueOf(true)).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_HOST).withValue(KafkaBridgeHttpConfig.HTTP_DEFAULT_HOST).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_PORT).withValue(String.valueOf(KafkaBridgeHttpConfig.HTTP_DEFAULT_PORT)).build()); - expected.add(new EnvVarBuilder().withName(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED).withValue(String.valueOf(false)).build()); return expected; } @@ -251,7 +240,6 @@ public void testGenerateDeployment() { assertThat(dep.getSpec().getStrategy().getType(), is("RollingUpdate")); assertThat(dep.getSpec().getStrategy().getRollingUpdate().getMaxSurge().getIntVal(), is(1)); assertThat(dep.getSpec().getStrategy().getRollingUpdate().getMaxUnavailable().getIntVal(), is(0)); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(dep.getSpec().getTemplate().getSpec().getContainers().get(0)).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_TLS), is(nullValue())); assertThat(dep.getSpec().getTemplate().getSpec().getVolumes().stream() .filter(volume -> volume.getName().equalsIgnoreCase("strimzi-tmp")) .findFirst().get().getEmptyDir().getSizeLimit(), is(new Quantity(VolumeUtils.STRIMZI_TMP_DIRECTORY_DEFAULT_SIZE))); @@ -282,7 +270,6 @@ public void testGenerateDeploymentWithTls() { assertThat(containers.get(0).getVolumeMounts().get(3).getMountPath(), is(KafkaBridgeCluster.TLS_CERTS_BASE_VOLUME_MOUNT + "my-another-secret")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS), is("my-secret/cert.crt;my-secret/new-cert.crt;my-another-secret/another-cert.crt")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_TLS), is("true")); } @ParallelTest @@ -313,7 +300,6 @@ public void testGenerateDeploymentWithTlsAuth() { assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(ENV_VAR_KAFKA_BRIDGE_TLS_AUTH_CERT), is("user-secret/user.crt")); assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(ENV_VAR_KAFKA_BRIDGE_TLS_AUTH_KEY), is("user-secret/user.key")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(containers.get(0)).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_TLS), is("true")); } @ParallelTest @@ -973,14 +959,14 @@ public void testNullClusterRoleBinding() { @ParallelTest public void testKafkaBridgeContainerEnvVarsConflict() { ContainerEnvVar envVar1 = new ContainerEnvVar(); - String testEnvOneKey = KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS; - String testEnvOneValue = "test.env.one"; + String testEnvOneKey = KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED; + String testEnvOneValue = "false"; envVar1.setName(testEnvOneKey); envVar1.setValue(testEnvOneValue); ContainerEnvVar envVar2 = new ContainerEnvVar(); - String testEnvTwoKey = KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG; - String testEnvTwoValue = "test.env.two"; + String testEnvTwoKey = KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS; + String testEnvTwoValue = "PEM certs"; envVar2.setName(testEnvTwoKey); envVar2.setValue(testEnvTwoValue); @@ -992,8 +978,14 @@ public void testKafkaBridgeContainerEnvVarsConflict() { KafkaBridge resource = new KafkaBridgeBuilder(this.resource) .editSpec() + .withNewTls() + .addNewTrustedCertificate() + .withSecretName("tls-trusted-certificate") + .withCertificate("pem-content") + .endTrustedCertificate() + .endTls() .withNewTemplate() - .withBridgeContainer(kafkaBridgeContainer) + .withBridgeContainer(kafkaBridgeContainer) .endTemplate() .endSpec() .build(); @@ -1405,9 +1397,9 @@ public void testOpenTelemetryTracingConfiguration() { KafkaBridgeCluster kb = KafkaBridgeCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, SHARED_ENV_PROVIDER); Deployment deployment = kb.generateDeployment(new HashMap<>(), true, null, null); - Container container = deployment.getSpec().getTemplate().getSpec().getContainers().get(0); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_STRIMZI_TRACING), is(OpenTelemetryTracing.TYPE_OPENTELEMETRY)); + ConfigMap configMap = kb.generateBridgeConfigMap(metricsAndLogging); + assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), containsString("bridge.tracing=" + OpenTelemetryTracing.TYPE_OPENTELEMETRY)); } @ParallelTest @@ -1425,11 +1417,11 @@ public void testCorsConfiguration() { KafkaBridgeCluster kb = KafkaBridgeCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, SHARED_ENV_PROVIDER); Deployment deployment = kb.generateDeployment(new HashMap<>(), true, null, null); - Container container = deployment.getSpec().getTemplate().getSpec().getContainers().get(0); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED), is("true")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS), is("https://strimzi.io,https://cncf.io")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS), is("GET,POST,PUT,DELETE,PATCH")); + ConfigMap configMap = kb.generateBridgeConfigMap(metricsAndLogging); + assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), containsString("http.cors.enabled=true")); + assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), containsString("http.cors.allowedOrigins=https://strimzi.io,https://cncf.io")); + assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), containsString("http.cors.allowedMethods=GET,POST,PUT,DELETE,PATCH")); } @ParallelTest @@ -1474,10 +1466,22 @@ public void testConsumerProducerOptions() { KafkaBridgeCluster kb = KafkaBridgeCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, resource, SHARED_ENV_PROVIDER); Deployment deployment = kb.generateDeployment(new HashMap<>(), true, null, null); - Container container = deployment.getSpec().getTemplate().getSpec().getContainers().get(0); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT), is("60")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED), is("true")); - assertThat(io.strimzi.operator.cluster.TestUtils.containerEnvVars(container).get(KafkaBridgeCluster.ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED), is("true")); + ConfigMap configMap = kb.generateBridgeConfigMap(metricsAndLogging); + assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), containsString("http.timeoutSeconds=60")); + assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), containsString("http.consumer.enabled=true")); + assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), containsString("http.producer.enabled=true")); + } + + @ParallelTest + public void testConfigurationConfigMap() { + KafkaBridgeCluster kb = KafkaBridgeCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, this.resource, SHARED_ENV_PROVIDER); + Deployment deployment = kb.generateDeployment(new HashMap<>(), true, null, null); + ConfigMap configMap = kb.generateBridgeConfigMap(metricsAndLogging); + + assertThat(configMap, is(notNullValue())); + assertThat(configMap.getData().get(LoggingModel.LOG4J2_CONFIG_MAP_KEY), is(notNullValue())); + assertThat(configMap.getData().get(MetricsModel.CONFIG_MAP_KEY), is(nullValue())); + assertThat(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME), is(notNullValue())); } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilderTest.java new file mode 100644 index 00000000000..0d720adb3a7 --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilderTest.java @@ -0,0 +1,457 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model; + +import io.strimzi.api.kafka.model.bridge.KafkaBridgeAdminClientSpec; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeAdminClientSpecBuilder; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpec; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpecBuilder; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeHttpConfig; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeHttpConfigBuilder; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeProducerSpec; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeProducerSpecBuilder; +import io.strimzi.api.kafka.model.common.ClientTls; +import io.strimzi.api.kafka.model.common.ClientTlsBuilder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuthBuilder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlain; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlainBuilder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha256; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha256Builder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha512; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha512Builder; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTls; +import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTlsBuilder; +import io.strimzi.api.kafka.model.common.tracing.OpenTelemetryTracing; +import io.strimzi.operator.common.Reconciliation; +import io.strimzi.test.annotations.ParallelSuite; +import io.strimzi.test.annotations.ParallelTest; + +import java.util.Map; + +import static io.strimzi.operator.cluster.TestUtils.IsEquivalent.isEquivalent; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; + +@ParallelSuite +public class KafkaBridgeConfigurationBuilderTest { + + private static final String BRIDGE_CLUSTER = "my-bridge"; + private static final String BRIDGE_BOOTSTRAP_SERVERS = "my-cluster-kafka-bootstrap:9092"; + + @ParallelTest + public void testBaseConfiguration() { + // test base/default bridge configuration + String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS).build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT" + )); + } + + @ParallelTest + public void testConfigProviders() { + // test config providers setting + String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withConfigProviders() + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "kafka.config.providers=strimzienv,strimzifile,strimzidir", + "kafka.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider", + "kafka.config.providers.strimzienv.param.allowlist.pattern=.*", + "kafka.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider", + "kafka.config.providers.strimzifile.param.allowed.paths=/opt/strimzi", + "kafka.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider", + "kafka.config.providers.strimzidir.param.allowed.paths=/opt/strimzi" + )); + } + + @ParallelTest + public void testTracing() { + // test no tracing configured + String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS).build(); + assertThat(configuration, not(containsString("bridge.tracing"))); + + // test opentelemetry tracing enabled + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withTracing(new OpenTelemetryTracing()) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "bridge.tracing=opentelemetry" + )); + } + + @ParallelTest + public void testTls() { + // test TLS configuration (only server authentication, encryption) + ClientTls clientTls = new ClientTlsBuilder() + .addNewTrustedCertificate() + .withSecretName("tls-trusted-certificate") + .withCertificate("pem-content") + .endTrustedCertificate() + .build(); + + String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withTls(clientTls) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=SSL", + "kafka.ssl.truststore.location=/tmp/strimzi/bridge.truststore.p12", + "kafka.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "kafka.ssl.truststore.type=PKCS12" + )); + + // test TLS with mutual authentication (mTLS, server and client authentication) + KafkaClientAuthenticationTls tlsAuth = new KafkaClientAuthenticationTlsBuilder() + .withNewCertificateAndKey() + .withSecretName("tls-keystore") + .withCertificate("pem-content") + .endCertificateAndKey() + .build(); + + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withTls(clientTls) + .withAuthentication(tlsAuth) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=SSL", + "kafka.ssl.truststore.location=/tmp/strimzi/bridge.truststore.p12", + "kafka.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "kafka.ssl.truststore.type=PKCS12", + "kafka.ssl.keystore.location=/tmp/strimzi/bridge.keystore.p12", + "kafka.ssl.keystore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "kafka.ssl.keystore.type=PKCS12" + )); + } + + @ParallelTest + public void testSaslMechanism() { + // test plain authentication + KafkaClientAuthenticationPlain authPlain = new KafkaClientAuthenticationPlainBuilder() + .withNewPasswordSecret() + .withSecretName("my-auth-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .build(); + String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withAuthentication(authPlain) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=SASL_PLAINTEXT", + "kafka.sasl.mechanism=PLAIN", + "kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_BRIDGE_SASL_USERNAME} password=${strimzidir:/opt/strimzi/bridge-password/my-auth-secret:my-password-key};" + )); + + // test plain authentication but with TLS as well (server authentication only, encryption) + ClientTls clientTls = new ClientTlsBuilder() + .addNewTrustedCertificate() + .withSecretName("tls-trusted-certificate") + .withCertificate("pem-content") + .endTrustedCertificate() + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withTls(clientTls) + .withAuthentication(authPlain) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=SASL_SSL", + "kafka.sasl.mechanism=PLAIN", + "kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=${strimzienv:KAFKA_BRIDGE_SASL_USERNAME} password=${strimzidir:/opt/strimzi/bridge-password/my-auth-secret:my-password-key};", + "kafka.ssl.truststore.location=/tmp/strimzi/bridge.truststore.p12", + "kafka.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD}", + "kafka.ssl.truststore.type=PKCS12" + )); + + // test scram-sha-256 authentication + KafkaClientAuthenticationScramSha256 authScramSha256 = new KafkaClientAuthenticationScramSha256Builder() + .withNewPasswordSecret() + .withSecretName("my-auth-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withAuthentication(authScramSha256) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=SASL_PLAINTEXT", + "kafka.sasl.mechanism=SCRAM-SHA-256", + "kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_BRIDGE_SASL_USERNAME} password=${strimzidir:/opt/strimzi/bridge-password/my-auth-secret:my-password-key};" + )); + + // test scram-sha-512 authentication + KafkaClientAuthenticationScramSha512 authScramSha512 = new KafkaClientAuthenticationScramSha512Builder() + .withNewPasswordSecret() + .withSecretName("my-auth-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withAuthentication(authScramSha512) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=SASL_PLAINTEXT", + "kafka.sasl.mechanism=SCRAM-SHA-512", + "kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=${strimzienv:KAFKA_BRIDGE_SASL_USERNAME} password=${strimzidir:/opt/strimzi/bridge-password/my-auth-secret:my-password-key};" + )); + + // test oauth authentication + KafkaClientAuthenticationOAuth authOAuth = new KafkaClientAuthenticationOAuthBuilder() + .withClientId("oauth-client-id") + .withTokenEndpointUri("http://token-endpoint-uri") + .withUsername("oauth-username") + .withNewClientSecret() + .withSecretName("my-client-secret-secret") + .withKey("my-client-secret-key") + .endClientSecret() + .withNewRefreshToken() + .withSecretName("my-refresh-token-secret") + .withKey("my-refresh-token-key") + .endRefreshToken() + .withNewAccessToken() + .withSecretName("my-access-token-secret") + .withKey("my-access-token-key") + .endAccessToken() + .withNewPasswordSecret() + .withSecretName("my-password-secret-secret") + .withPassword("my-password-key") + .endPasswordSecret() + .addNewTlsTrustedCertificate() + .withSecretName("my-tls-trusted-certificate") + .withCertificate("pem-content") + .endTlsTrustedCertificate() + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withAuthentication(authOAuth) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=SASL_PLAINTEXT", + "kafka.sasl.mechanism=OAUTHBEARER", + "kafka.sasl.jaas.config=" + + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_BRIDGE_OAUTH_CONFIG}" + + " oauth.client.secret=${strimzienv:KAFKA_BRIDGE_OAUTH_CLIENT_SECRET}" + + " oauth.refresh.token=${strimzienv:KAFKA_BRIDGE_OAUTH_REFRESH_TOKEN}" + + " oauth.access.token=${strimzienv:KAFKA_BRIDGE_OAUTH_ACCESS_TOKEN}" + + " oauth.password.grant.password=${strimzienv:KAFKA_BRIDGE_OAUTH_PASSWORD_GRANT_PASSWORD}" + + " oauth.ssl.truststore.location=\"/tmp/strimzi/oauth.truststore.p12\" oauth.ssl.truststore.password=${strimzienv:CERTS_STORE_PASSWORD} oauth.ssl.truststore.type=\"PKCS12\";", + "kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler" + )); + } + + @ParallelTest + public void testKafkaProducer() { + // test missing Kafka Producer configuration + String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .build(); + assertThat(configuration, not(containsString("kafka.producer."))); + + // test some Kafka Producer parameters + KafkaBridgeProducerSpec kafkaBridgeProducer = new KafkaBridgeProducerSpecBuilder() + .withConfig( + Map.of( + "acks", 1, + "linger.ms", 100, + "key.serializer", "my-producer-key-serializer", + "value.serializer", "my-producer-value-serializer" + )) + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withKafkaProducer(kafkaBridgeProducer) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "kafka.producer.acks=1", + "kafka.producer.linger.ms=100", + "kafka.producer.key.serializer=my-producer-key-serializer", + "kafka.producer.value.serializer=my-producer-value-serializer" + )); + } + + @ParallelTest + public void testKafkaConsumer() { + // test missing Kafka Consumer configuration + String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .build(); + assertThat(configuration, not(containsString("kafka.consumer."))); + + // test some Kafka Consumer parameters + KafkaBridgeConsumerSpec kafkaBridgeConsumer = new KafkaBridgeConsumerSpecBuilder() + .withConfig( + Map.of( + "auto.offset.reset", "earliest", + "key.deserializer", "my-consumer-key-deserializer", + "value.deserializer", "my-consumer-value-deserializer" + )) + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withKafkaConsumer(kafkaBridgeConsumer) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "kafka.consumer.auto.offset.reset=earliest", + "kafka.consumer.key.deserializer=my-consumer-key-deserializer", + "kafka.consumer.value.deserializer=my-consumer-value-deserializer", + "kafka.consumer.client.rack=${strimzidir:/opt/strimzi/init:rack.id}" + )); + } + + @ParallelTest + public void testKafkaAdminClient() { + // test missing Kafka Admin configuration + String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .build(); + assertThat(configuration, not(containsString("kafka.admin."))); + + // test some Kafka Admin parameters + KafkaBridgeAdminClientSpec kafkaBridgeAdminClient = new KafkaBridgeAdminClientSpecBuilder() + .withConfig( + Map.of( + "client.id", "my-admin-client", + "bootstrap.controllers", "my-bootstrap-controllers" + )) + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withKafkaAdminClient(kafkaBridgeAdminClient) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "kafka.admin.client.id=my-admin-client", + "kafka.admin.bootstrap.controllers=my-bootstrap-controllers" + )); + } + + @ParallelTest + public void testHttp() { + // test default HTTP configuration. + // NOTE: the "http" section is mandatory when using the KafkaBridge custom resource, so we define and set it + KafkaBridgeHttpConfig http = new KafkaBridgeHttpConfigBuilder() + .build(); + String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withHttp(http, null, null) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "http.host=0.0.0.0", + "http.port=8080", + "http.cors.enabled=false", + "http.consumer.enabled=true", + "http.timeoutSeconds=-1", + "http.producer.enabled=true" + )); + + // test different consumer timeout + KafkaBridgeConsumerSpec kafkaBridgeConsumer = new KafkaBridgeConsumerSpecBuilder() + .withTimeoutSeconds(10000) + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withHttp(http, null, kafkaBridgeConsumer) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "http.host=0.0.0.0", + "http.port=8080", + "http.cors.enabled=false", + "http.consumer.enabled=true", + "http.timeoutSeconds=10000", + "http.producer.enabled=true" + )); + + // test disabling HTTP part of the consumer and producer + kafkaBridgeConsumer = new KafkaBridgeConsumerSpecBuilder() + .withEnabled(false) + .build(); + KafkaBridgeProducerSpec kafkaBridgeProducer = new KafkaBridgeProducerSpecBuilder() + .withEnabled(false) + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "http.host=0.0.0.0", + "http.port=8080", + "http.cors.enabled=false", + "http.consumer.enabled=false", + "http.timeoutSeconds=-1", + "http.producer.enabled=false" + )); + + // test different HTTP port + http = new KafkaBridgeHttpConfigBuilder() + .withPort(8081) + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withHttp(http, null, null) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "http.host=0.0.0.0", + "http.port=8081", + "http.cors.enabled=false", + "http.consumer.enabled=true", + "http.timeoutSeconds=-1", + "http.producer.enabled=true" + )); + + // test CORS configuration + http = new KafkaBridgeHttpConfigBuilder() + .withNewCors() + .withAllowedOrigins("https://strimzi.io", "https://cncf.io") + .withAllowedMethods("GET", "POST", "PUT", "DELETE", "PATCH") + .endCors() + .build(); + configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS) + .withHttp(http, null, null) + .build(); + assertThat(configuration, isEquivalent( + "bridge.id=my-bridge", + "kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092", + "kafka.security.protocol=PLAINTEXT", + "http.host=0.0.0.0", + "http.port=8080", + "http.cors.enabled=true", + "http.cors.allowedOrigins=https://strimzi.io,https://cncf.io", + "http.cors.allowedMethods=GET,POST,PUT,DELETE,PATCH", + "http.consumer.enabled=true", + "http.timeoutSeconds=-1", + "http.producer.enabled=true" + )); + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java index d65b5ab30d1..fbb0a4d32e4 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java @@ -36,22 +36,13 @@ import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters; import io.strimzi.test.annotations.ParallelSuite; import io.strimzi.test.annotations.ParallelTest; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; + import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeSet; -import static io.strimzi.operator.cluster.model.KafkaBrokerConfigurationBuilderTest.IsEquivalent.isEquivalent; +import static io.strimzi.operator.cluster.TestUtils.IsEquivalent.isEquivalent; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -2673,75 +2664,4 @@ public void testWithKRaftMetadataLogDir() { "metadata.log.dir=/my/kraft/metadata/kafka-log2" )); } - - static class IsEquivalent extends TypeSafeMatcher { - private final List expectedLines; - - public IsEquivalent(String expectedConfig) { - super(); - this.expectedLines = ModelUtils.getLinesWithoutCommentsAndEmptyLines(expectedConfig); - } - - public IsEquivalent(List expectedLines) { - super(); - this.expectedLines = expectedLines; - } - - @Override - protected boolean matchesSafely(String config) { - List actualLines = ModelUtils.getLinesWithoutCommentsAndEmptyLines(config); - - return expectedLines.containsAll(actualLines) && actualLines.containsAll(expectedLines); - } - - private String getLinesAsString(Collection configLines) { - StringWriter stringWriter = new StringWriter(); - PrintWriter writer = new PrintWriter(stringWriter); - - for (String line : configLines) { - writer.println(line); - } - - return stringWriter.toString(); - } - - @Override - public void describeTo(Description description) { - description.appendText(getLinesAsString(new TreeSet<>(expectedLines))); - } - - @Override - protected void describeMismatchSafely(String item, Description mismatchDescription) { - printDiff(item, mismatchDescription); - } - - private void printDiff(String item, Description mismatchDescription) { - List actualLines = ModelUtils.getLinesWithoutCommentsAndEmptyLines(item); - List actualLinesDiff = new ArrayList<>(actualLines); - actualLinesDiff.removeAll(expectedLines); - List expectedLinesDiff = new ArrayList<>(expectedLines); - expectedLinesDiff.removeAll(actualLines); - - mismatchDescription - .appendText(" was: \n") - .appendText(getLinesAsString(new TreeSet<>(ModelUtils.getLinesWithoutCommentsAndEmptyLines(item)))) - .appendText("\n\n") - .appendText(" wrong lines in expected:\n") - .appendText(getLinesAsString(expectedLinesDiff)) - .appendText("\n\n") - .appendText(" Wrong lines in actual:\n") - .appendText(getLinesAsString(actualLinesDiff)) - .appendText("\n\nOriginal value: \n") - .appendText(item) - .appendText("\n\n"); - } - - public static Matcher isEquivalent(String expectedConfig) { - return new IsEquivalent(expectedConfig); - } - - public static Matcher isEquivalent(String... expectedLines) { - return new IsEquivalent(asList(expectedLines)); - } - } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectDockerfileTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectDockerfileTest.java index ffe7a51df7f..b75a002510c 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectDockerfileTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectDockerfileTest.java @@ -21,7 +21,7 @@ import io.strimzi.test.annotations.ParallelSuite; import io.strimzi.test.annotations.ParallelTest; -import static io.strimzi.operator.cluster.model.KafkaBrokerConfigurationBuilderTest.IsEquivalent.isEquivalent; +import static io.strimzi.operator.cluster.TestUtils.IsEquivalent.isEquivalent; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.hamcrest.CoreMatchers.is; diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java index 2e0b74d6790..70a79ca7e60 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java @@ -5,7 +5,6 @@ package io.strimzi.operator.cluster.operator.assembly; import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.LabelSelector; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.apps.Deployment; @@ -27,7 +26,6 @@ import io.strimzi.operator.cluster.model.KafkaVersion; import io.strimzi.operator.cluster.model.MockSharedEnvironmentProvider; import io.strimzi.operator.cluster.model.SharedEnvironmentProvider; -import io.strimzi.operator.cluster.model.metrics.MetricsModel; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; import io.strimzi.operator.cluster.operator.resource.kubernetes.ClusterRoleBindingOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.ConfigMapOperator; @@ -38,12 +36,12 @@ import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceOperator; import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.Util; import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.model.PasswordGenerator; import io.strimzi.operator.common.operator.MockCertManager; import io.strimzi.operator.common.operator.resource.ReconcileResult; import io.strimzi.platform.KubernetesVersion; -import io.strimzi.test.TestUtils; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; @@ -57,7 +55,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -89,7 +86,6 @@ public class KafkaBridgeAssemblyOperatorTest { private static final SharedEnvironmentProvider SHARED_ENV_PROVIDER = new MockSharedEnvironmentProvider(); protected static Vertx vertx; - private static final String METRICS_CONFIG = "{\"foo\":\"bar\"}"; private static final String BOOTSTRAP_SERVERS = "foo-kafka:9092"; private static final KafkaBridgeConsumerSpec KAFKA_BRIDGE_CONSUMER_SPEC = new KafkaBridgeConsumerSpec(); @@ -141,7 +137,8 @@ public void testCreateOrUpdateCreatesCluster(VertxTestContext context) { ArgumentCaptor pdbCaptor = ArgumentCaptor.forClass(PodDisruptionBudget.class); when(mockPdbOps.reconcile(any(), anyString(), any(), pdbCaptor.capture())).thenReturn(Future.succeededFuture()); - when(mockCmOps.reconcile(any(), anyString(), any(), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(new ConfigMap()))); + ArgumentCaptor cmCaptor = ArgumentCaptor.forClass(ConfigMap.class); + when(mockCmOps.reconcile(any(), anyString(), any(), cmCaptor.capture())).thenReturn(Future.succeededFuture()); ArgumentCaptor bridgeCaptor = ArgumentCaptor.forClass(KafkaBridge.class); when(mockBridgeOps.updateStatusAsync(any(), bridgeCaptor.capture())).thenReturn(Future.succeededFuture()); @@ -167,8 +164,15 @@ public void testCreateOrUpdateCreatesCluster(VertxTestContext context) { List capturedDc = dcCaptor.getAllValues(); assertThat(capturedDc, hasSize(1)); Deployment dc = capturedDc.get(0); + // getting the bridge ConfigMap to process the hash of the configuration to be checked on the corresponding Deployment annotation + List captureCm = cmCaptor.getAllValues(); + assertThat(captureCm, hasSize(1)); + ConfigMap cm = captureCm.get(0); assertThat(dc.getMetadata().getName(), is(bridge.getComponentName())); - assertThat(dc, is(bridge.generateDeployment(Collections.singletonMap(Annotations.ANNO_STRIMZI_AUTH_HASH, "0"), true, null, null))); + assertThat(dc, is(bridge.generateDeployment(Map.of( + Annotations.ANNO_STRIMZI_AUTH_HASH, "0", + KafkaBridgeCluster.ANNO_STRIMZI_IO_CONFIGURATION_HASH, "0ff4f460" + ), true, null, null))); // Verify PodDisruptionBudget List capturedPdb = pdbCaptor.getAllValues(); @@ -309,24 +313,9 @@ public void testCreateOrUpdateUpdatesCluster(VertxTestContext context) { ArgumentCaptor pdbCaptor = ArgumentCaptor.forClass(PodDisruptionBudget.class); when(mockPdbOps.reconcile(any(), anyString(), any(), pdbCaptor.capture())).thenReturn(Future.succeededFuture()); - when(mockCmOps.reconcile(any(), anyString(), any(), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(new ConfigMap()))); - - // Mock CM get - when(mockBridgeOps.get(kbNamespace, kbName)).thenReturn(kb); - ConfigMap metricsCm = new ConfigMapBuilder().withNewMetadata() - .withName(KafkaBridgeResources.metricsAndLogConfigMapName(kbName)) - .withNamespace(kbNamespace) - .endMetadata() - .withData(Collections.singletonMap(MetricsModel.CONFIG_MAP_KEY, METRICS_CONFIG)) - .build(); - when(mockCmOps.get(kbNamespace, KafkaBridgeResources.metricsAndLogConfigMapName(kbName))).thenReturn(metricsCm); - // Mock CM patch - Set metricsCms = TestUtils.modifiableSet(); - doAnswer(invocation -> { - metricsCms.add(invocation.getArgument(1)); - return Future.succeededFuture(); - }).when(mockCmOps).reconcile(any(), eq(kbNamespace), anyString(), any()); + ArgumentCaptor cmCaptor = ArgumentCaptor.forClass(ConfigMap.class); + when(mockCmOps.reconcile(any(), anyString(), any(), cmCaptor.capture())).thenReturn(Future.succeededFuture()); KafkaBridgeAssemblyOperator ops = new KafkaBridgeAssemblyOperator(vertx, new PlatformFeaturesAvailability(true, kubernetesVersion), @@ -351,8 +340,15 @@ public void testCreateOrUpdateUpdatesCluster(VertxTestContext context) { List capturedDc = dcCaptor.getAllValues(); assertThat(capturedDc, hasSize(1)); Deployment dc = capturedDc.get(0); + // getting the bridge ConfigMap to process the hash of the configuration to be checked on the corresponding Deployment annotation + List captureCm = cmCaptor.getAllValues(); + assertThat(captureCm, hasSize(1)); + ConfigMap cm = captureCm.get(0); assertThat(dc.getMetadata().getName(), is(compareTo.getComponentName())); - assertThat(dc, is(compareTo.generateDeployment(Collections.singletonMap(Annotations.ANNO_STRIMZI_AUTH_HASH, "0"), true, null, null))); + assertThat(dc, is(compareTo.generateDeployment(Map.of( + Annotations.ANNO_STRIMZI_AUTH_HASH, "0", + KafkaBridgeCluster.ANNO_STRIMZI_IO_CONFIGURATION_HASH, "0ff4f460" + ), true, null, null))); // Verify PodDisruptionBudget List capturedPdb = pdbCaptor.getAllValues(); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeST.java b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeST.java index b3f032538d0..5dc246efd1b 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/bridge/HttpBridgeST.java @@ -4,6 +4,7 @@ */ package io.strimzi.systemtest.bridge; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.Service; import io.skodjob.annotations.Desc; @@ -60,7 +61,6 @@ import static io.strimzi.test.k8s.KubeClusterResource.cmdKubeClient; import static io.strimzi.test.k8s.KubeClusterResource.kubeClient; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -187,11 +187,9 @@ void testReceiveSimpleMessage() { void testCustomAndUpdatedValues() { String bridgeName = "custom-bridge"; - String usedVariable = "KAFKA_BRIDGE_PRODUCER_CONFIG"; LinkedHashMap envVarGeneral = new LinkedHashMap<>(); envVarGeneral.put("TEST_ENV_1", "test.env.one"); envVarGeneral.put("TEST_ENV_2", "test.env.two"); - envVarGeneral.put(usedVariable, "test.value"); LinkedHashMap envVarUpdated = new LinkedHashMap<>(); envVarUpdated.put("TEST_ENV_2", "updated.test.env.two"); @@ -243,19 +241,11 @@ void testCustomAndUpdatedValues() { Map bridgeSnapshot = DeploymentUtils.depSnapshot(Environment.TEST_SUITE_NAMESPACE, KafkaBridgeResources.componentName(bridgeName)); - // Remove variable which is already in use - envVarGeneral.remove(usedVariable); LOGGER.info("Verifying values before update"); VerificationUtils.verifyReadinessAndLivenessProbes(Environment.TEST_SUITE_NAMESPACE, KafkaBridgeResources.componentName(bridgeName), KafkaBridgeResources.componentName(bridgeName), initialDelaySeconds, timeoutSeconds, periodSeconds, successThreshold, failureThreshold); VerificationUtils.verifyContainerEnvVariables(Environment.TEST_SUITE_NAMESPACE, KafkaBridgeResources.componentName(bridgeName), KafkaBridgeResources.componentName(bridgeName), envVarGeneral); - LOGGER.info("Check if actual env variable {} has different value than {}", usedVariable, "test.value"); - assertThat( - StUtils.checkEnvVarInPod(Environment.TEST_SUITE_NAMESPACE, kubeClient().listPods(Environment.TEST_SUITE_NAMESPACE, suiteTestStorage.getClusterName(), Labels.STRIMZI_KIND_LABEL, KafkaBridge.RESOURCE_KIND).get(0).getMetadata().getName(), usedVariable), - is(not("test.value")) - ); - LOGGER.info("Updating values in Bridge container"); KafkaBridgeResource.replaceBridgeResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, bridgeName, kb -> { kb.getSpec().getTemplate().getBridgeContainer().setEnv(StUtils.createContainerEnvVarsFromMap(envVarUpdated)); @@ -277,8 +267,14 @@ void testCustomAndUpdatedValues() { VerificationUtils.verifyReadinessAndLivenessProbes(Environment.TEST_SUITE_NAMESPACE, KafkaBridgeResources.componentName(bridgeName), KafkaBridgeResources.componentName(bridgeName), updatedInitialDelaySeconds, updatedTimeoutSeconds, updatedPeriodSeconds, successThreshold, updatedFailureThreshold); VerificationUtils.verifyContainerEnvVariables(Environment.TEST_SUITE_NAMESPACE, KafkaBridgeResources.componentName(bridgeName), KafkaBridgeResources.componentName(bridgeName), envVarUpdated); - VerificationUtils.verifyComponentConfiguration(Environment.TEST_SUITE_NAMESPACE, KafkaBridgeResources.componentName(bridgeName), KafkaBridgeResources.componentName(bridgeName), "KAFKA_BRIDGE_PRODUCER_CONFIG", producerConfig); - VerificationUtils.verifyComponentConfiguration(Environment.TEST_SUITE_NAMESPACE, KafkaBridgeResources.componentName(bridgeName), KafkaBridgeResources.componentName(bridgeName), "KAFKA_BRIDGE_CONSUMER_CONFIG", consumerConfig); + + ConfigMap configMap = kubeClient().namespace(Environment.TEST_SUITE_NAMESPACE).getConfigMap(KafkaBridgeResources.metricsAndLogConfigMapName(bridgeName)); + String bridgeConfiguration = configMap.getData().get("application.properties"); + Map config = StUtils.loadProperties(bridgeConfiguration); + Map producerConfigMap = config.entrySet().stream().filter(e -> e.getKey().startsWith("kafka.producer.")).collect(Collectors.toMap(e -> e.getKey().replace("kafka.producer.", ""), Map.Entry::getValue)); + Map consumerConfigMap = config.entrySet().stream().filter(e -> e.getKey().startsWith("kafka.consumer.")).collect(Collectors.toMap(e -> e.getKey().replace("kafka.consumer.", ""), Map.Entry::getValue)); + assertThat(producerConfigMap.entrySet().containsAll(producerConfig.entrySet()), is(true)); + assertThat(consumerConfigMap.entrySet().containsAll(consumerConfig.entrySet()), is(true)); } @ParallelTest From d97c6fc0f42ee57fe219ea7fb2faa798cdf498e8 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Tue, 21 Jan 2025 17:58:58 +0100 Subject: [PATCH 2/2] Added test to check bridge configuration hash changes Signed-off-by: Paolo Patierno --- .../KafkaBridgeAssemblyOperatorTest.java | 84 ++++++++++++++++--- 1 file changed, 72 insertions(+), 12 deletions(-) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java index 70a79ca7e60..b243fda9fc8 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java @@ -14,9 +14,11 @@ import io.strimzi.api.kafka.model.bridge.KafkaBridge; import io.strimzi.api.kafka.model.bridge.KafkaBridgeBuilder; import io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpec; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeConsumerSpecBuilder; import io.strimzi.api.kafka.model.bridge.KafkaBridgeHttpConfig; import io.strimzi.api.kafka.model.bridge.KafkaBridgeList; import io.strimzi.api.kafka.model.bridge.KafkaBridgeProducerSpec; +import io.strimzi.api.kafka.model.bridge.KafkaBridgeProducerSpecBuilder; import io.strimzi.api.kafka.model.bridge.KafkaBridgeResources; import io.strimzi.api.kafka.model.bridge.KafkaBridgeStatus; import io.strimzi.operator.cluster.KafkaVersionTestUtils; @@ -64,6 +66,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; @@ -137,8 +140,7 @@ public void testCreateOrUpdateCreatesCluster(VertxTestContext context) { ArgumentCaptor pdbCaptor = ArgumentCaptor.forClass(PodDisruptionBudget.class); when(mockPdbOps.reconcile(any(), anyString(), any(), pdbCaptor.capture())).thenReturn(Future.succeededFuture()); - ArgumentCaptor cmCaptor = ArgumentCaptor.forClass(ConfigMap.class); - when(mockCmOps.reconcile(any(), anyString(), any(), cmCaptor.capture())).thenReturn(Future.succeededFuture()); + when(mockCmOps.reconcile(any(), anyString(), any(), any())).thenReturn(Future.succeededFuture()); ArgumentCaptor bridgeCaptor = ArgumentCaptor.forClass(KafkaBridge.class); when(mockBridgeOps.updateStatusAsync(any(), bridgeCaptor.capture())).thenReturn(Future.succeededFuture()); @@ -164,10 +166,6 @@ public void testCreateOrUpdateCreatesCluster(VertxTestContext context) { List capturedDc = dcCaptor.getAllValues(); assertThat(capturedDc, hasSize(1)); Deployment dc = capturedDc.get(0); - // getting the bridge ConfigMap to process the hash of the configuration to be checked on the corresponding Deployment annotation - List captureCm = cmCaptor.getAllValues(); - assertThat(captureCm, hasSize(1)); - ConfigMap cm = captureCm.get(0); assertThat(dc.getMetadata().getName(), is(bridge.getComponentName())); assertThat(dc, is(bridge.generateDeployment(Map.of( Annotations.ANNO_STRIMZI_AUTH_HASH, "0", @@ -314,8 +312,7 @@ public void testCreateOrUpdateUpdatesCluster(VertxTestContext context) { ArgumentCaptor pdbCaptor = ArgumentCaptor.forClass(PodDisruptionBudget.class); when(mockPdbOps.reconcile(any(), anyString(), any(), pdbCaptor.capture())).thenReturn(Future.succeededFuture()); - ArgumentCaptor cmCaptor = ArgumentCaptor.forClass(ConfigMap.class); - when(mockCmOps.reconcile(any(), anyString(), any(), cmCaptor.capture())).thenReturn(Future.succeededFuture()); + when(mockCmOps.reconcile(any(), anyString(), any(), any())).thenReturn(Future.succeededFuture()); KafkaBridgeAssemblyOperator ops = new KafkaBridgeAssemblyOperator(vertx, new PlatformFeaturesAvailability(true, kubernetesVersion), @@ -340,10 +337,6 @@ public void testCreateOrUpdateUpdatesCluster(VertxTestContext context) { List capturedDc = dcCaptor.getAllValues(); assertThat(capturedDc, hasSize(1)); Deployment dc = capturedDc.get(0); - // getting the bridge ConfigMap to process the hash of the configuration to be checked on the corresponding Deployment annotation - List captureCm = cmCaptor.getAllValues(); - assertThat(captureCm, hasSize(1)); - ConfigMap cm = captureCm.get(0); assertThat(dc.getMetadata().getName(), is(compareTo.getComponentName())); assertThat(dc, is(compareTo.generateDeployment(Map.of( Annotations.ANNO_STRIMZI_AUTH_HASH, "0", @@ -736,4 +729,71 @@ public void testCreateOrUpdateBridgeZeroReplica(VertxTestContext context) { }))); } + @Test + public void testUpdatedBridgeConfigurationHash(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + DeploymentOperator mockDcOps = supplier.deploymentOperations; + ServiceOperator mockServiceOps = supplier.serviceOperations; + ConfigMapOperator mockCmOps = supplier.configMapOperations; + PodDisruptionBudgetOperator mockPdbOps = supplier.podDisruptionBudgetOperator; + + String kbName = "foo"; + String kbNamespace = "test"; + + KafkaBridge originalBridge = ResourceUtils.createEmptyKafkaBridge(kbNamespace, kbName); + KafkaBridgeCluster originalBridgeCluster = KafkaBridgeCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, originalBridge, SHARED_ENV_PROVIDER); + KafkaBridge updatedBridge = new KafkaBridgeBuilder(originalBridge) + .editSpec() + .withProducer( + new KafkaBridgeProducerSpecBuilder() + .withConfig(Map.of("acks", "1")) + .build() + ) + .withConsumer( + new KafkaBridgeConsumerSpecBuilder() + .withConfig(Map.of("auto.offset.reset", "earliest")) + .build() + ) + .endSpec() + .build(); + + when(mockDcOps.scaleDown(any(), eq(kbNamespace), any(), anyInt(), anyLong())).thenReturn(Future.succeededFuture()); + when(mockDcOps.scaleUp(any(), eq(kbNamespace), any(), anyInt(), anyLong())).thenReturn(Future.succeededFuture()); + when(mockDcOps.reconcile(any(), eq(kbNamespace), any(), any())).thenReturn(Future.succeededFuture()); + when(mockDcOps.waitForObserved(any(), anyString(), anyString(), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); + when(mockDcOps.readiness(any(), anyString(), anyString(), anyLong(), anyLong())).thenReturn(Future.succeededFuture()); + when(mockServiceOps.reconcile(any(), eq(kbNamespace), any(), any())).thenReturn(Future.succeededFuture()); + when(mockPdbOps.reconcile(any(), anyString(), any(), any())).thenReturn(Future.succeededFuture()); + + ArgumentCaptor cmCaptor = ArgumentCaptor.forClass(ConfigMap.class); + when(mockCmOps.reconcile(any(), anyString(), any(), cmCaptor.capture())).thenReturn(Future.succeededFuture()); + + KafkaBridgeAssemblyOperator ops = new KafkaBridgeAssemblyOperator(vertx, + new PlatformFeaturesAvailability(true, kubernetesVersion), + new MockCertManager(), new PasswordGenerator(10, "a", "a"), + supplier, + ResourceUtils.dummyClusterOperatorConfig(VERSIONS)); + + Reconciliation reconciliation = new Reconciliation("test-trigger", KafkaBridge.RESOURCE_KIND, kbNamespace, kbName); + + Checkpoint async = context.checkpoint(); + MetricsAndLoggingUtils.metricsAndLogging(reconciliation, mockCmOps, originalBridgeCluster.logging(), null) + .compose(metricsAndLogging -> { + ConfigMap originalConfigMap = originalBridgeCluster.generateBridgeConfigMap(metricsAndLogging); + String originalHash = Util.hashStub(originalConfigMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME)); + return ops.createOrUpdate(reconciliation, updatedBridge) + .onComplete(context.succeeding(v -> context.verify(() -> { + + // getting the updated ConfigMap and check its hash is different from the original one + List captureCm = cmCaptor.getAllValues(); + assertThat(captureCm, hasSize(1)); + ConfigMap cm = captureCm.get(0); + String newHash = Util.hashStub(cm.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME)); + assertThat(newHash, is(not(originalHash))); + + async.flag(); + }))); + }); + } + }