Skip to content

Commit

Permalink
Moved bridge configuration setup within the operator
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Patierno <[email protected]>

Reverted back original operator Deployment
Set different bridge image for STs

Signed-off-by: Paolo Patierno <[email protected]>

Fixed missing env vars to setup the truststore

Signed-off-by: Paolo Patierno <[email protected]>

Removed ST using env vars not in place anymore

Signed-off-by: Paolo Patierno <[email protected]>

Fixed checkstyle errors

Signed-off-by: Paolo Patierno <[email protected]>

CHANGELOG update

Signed-off-by: Paolo Patierno <[email protected]>

Fixed updated systemtests bridge MD

Signed-off-by: Paolo Patierno <[email protected]>

Fixed scholzj comments

Signed-off-by: Paolo Patierno <[email protected]>

Factored out a dedicated withConfigProviders method for the bridge
configuration builder
Refactored bridge configuration builder to use isEquivalent

Signed-off-by: Paolo Patierno <[email protected]>

Fixed tinaselenge comments

Signed-off-by: Paolo Patierno <[email protected]>

Fixed checkstyle issues

Signed-off-by: Paolo Patierno <[email protected]>

Fixed rolling bridge pod on configuration change

Signed-off-by: Paolo Patierno <[email protected]>

Fixed scholzj and tinaselenge feedback

Signed-off-by: Paolo Patierno <[email protected]>

Fixed KafkaBridgeAssemblyOperator tests about the missing new hash annotation

Signed-off-by: Paolo Patierno <[email protected]>

Reverted back some deleted tests

Signed-off-by: Paolo Patierno <[email protected]>

Adapted system tests to changes in configuration

Signed-off-by: Paolo Patierno <[email protected]>

Fixed scholzj and fvaleri feedback

Signed-off-by: Paolo Patierno <[email protected]>
  • Loading branch information
ppatierno committed Jan 21, 2025
1 parent c1b20f7 commit de427e5
Show file tree
Hide file tree
Showing 12 changed files with 1,018 additions and 228 deletions.
2 changes: 1 addition & 1 deletion .azure/templates/steps/system_test_general.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ jobs:
env:
DOCKER_TAG: $(docker_tag)
BRIDGE_IMAGE: "latest-released"
BRIDGE_IMAGE: "quay.io/ppatierno/kafka-bridge:bridge-config"
STRIMZI_RBAC_SCOPE: '${{ parameters.strimzi_rbac_scope }}'
DOCKER_REGISTRY: registry.minikube
CLUSTER_OPERATOR_INSTALL_TYPE: '${{ parameters.cluster_operator_install_type }}'
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Support for storage class overrides has been removed
* Added support to configure `dnsPolicy` and `dnsConfig` using the `template` sections.
* Store Kafka node certificates in separate Secrets, one Secret per pod.
* Moved HTTP bridge configuration to the ConfigMap setup by the operator.

### Major changes, deprecations and removals

Expand All @@ -21,6 +22,8 @@
* The storage overrides for configuring per-broker storage class are not supported anymore.
If you are using the storage overrides, you should instead use multiple KafkaNodePool resources with a different storage class each.
For more details about migrating from storage overrides, please follow the [documentation](https://strimzi.io/docs/operators/0.45.0/full/deploying.html#con-config-storage-zookeeper-str).
* Because of the corresponding configuration changes, the 0.32.0 is the minimum HTTP bridge release working with the Strimzi operator 0.46.0.
* If you have an Apache Kafka cluster running together with an HTTP bridge instance, please upgrade the bridge after upgrading the operator.

## 0.45.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl;
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.Util;
import io.strimzi.operator.common.model.InvalidResourceException;
Expand Down Expand Up @@ -86,29 +87,23 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging
// Kafka Bridge configuration keys (EnvVariables)
protected static final String ENV_VAR_PREFIX = "KAFKA_BRIDGE_";
protected static final String ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS = "KAFKA_BRIDGE_BOOTSTRAP_SERVERS";
protected static final String ENV_VAR_KAFKA_BRIDGE_TLS = "KAFKA_BRIDGE_TLS";
protected static final String ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS = "KAFKA_BRIDGE_TRUSTED_CERTS";
protected static final String OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT = "/opt/strimzi/oauth-certs/";
protected static final String ENV_VAR_STRIMZI_TRACING = "STRIMZI_TRACING";

protected static final String ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG = "KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG";
protected static final String ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG = "KAFKA_BRIDGE_PRODUCER_CONFIG";
protected static final String ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG = "KAFKA_BRIDGE_CONSUMER_CONFIG";
protected static final String ENV_VAR_KAFKA_BRIDGE_ID = "KAFKA_BRIDGE_ID";

protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_HOST = "KAFKA_BRIDGE_HTTP_HOST";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PORT = "KAFKA_BRIDGE_HTTP_PORT";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT = "KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED = "KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED = "KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED = "KAFKA_BRIDGE_CORS_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS = "KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS = "KAFKA_BRIDGE_CORS_ALLOWED_METHODS";

protected static final String CO_ENV_VAR_CUSTOM_BRIDGE_POD_LABELS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_LABELS";
protected static final String INIT_VOLUME_MOUNT = "/opt/strimzi/init";

/**
* Key under which the bridge configuration is stored in ConfigMap
*/
public static final String BRIDGE_CONFIGURATION_FILENAME = "application.properties";

/**
* Annotation for rolling the bridge whenever the configuration within the application.properties file is changed.
* When the configuration hash annotation change is detected, we force a pod restart.
*/
public static final String ANNO_STRIMZI_IO_CONFIGURATION_HASH = Annotations.STRIMZI_DOMAIN + "configuration-hash";

private int replicas;
private ClientTls tls;
private KafkaClientAuthentication authentication;
Expand Down Expand Up @@ -411,59 +406,12 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));
JvmOptionUtils.javaOptions(varList, jvmOptions);

varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS, bootstrapServers));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG, kafkaBridgeAdminClient == null ? "" : new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ID, cluster));

if (kafkaBridgeConsumer != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, String.valueOf(kafkaBridgeConsumer.isEnabled())));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(kafkaBridgeConsumer.getTimeoutSeconds())));
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, ""));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, "true"));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(KafkaBridgeConsumerSpec.HTTP_DEFAULT_TIMEOUT)));
}

if (kafkaBridgeProducer != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, String.valueOf(kafkaBridgeProducer.isEnabled())));
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, ""));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, "true"));
}

varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_HOST, KafkaBridgeHttpConfig.HTTP_DEFAULT_HOST));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PORT, String.valueOf(http != null ? http.getPort() : KafkaBridgeHttpConfig.HTTP_DEFAULT_PORT)));

if (http != null && http.getCors() != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "true"));

if (http.getCors().getAllowedOrigins() != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS, String.join(",", http.getCors().getAllowedOrigins())));
}

if (http.getCors().getAllowedMethods() != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS, String.join(",", http.getCors().getAllowedMethods())));
}
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "false"));
}

if (tls != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TLS, "true"));

if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates())));
}
if (tls != null && tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates())));
}

AuthenticationUtils.configureClientAuthenticationEnvVars(authentication, varList, name -> ENV_VAR_PREFIX + name);

if (tracing != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_TRACING, tracing.getType()));
}

// Add shared environment variables used for all containers
varList.addAll(sharedEnvironmentProvider.variables());

Expand Down Expand Up @@ -600,21 +548,39 @@ protected List<EnvVar> getInitContainerEnvVars() {
}

/**
* Generates a metrics and logging ConfigMap according to the configuration. If this operand doesn't support logging
* or metrics, they will nto be set.
* Generates a ConfigMap containing the bridge configuration related to HTTP and Kafka clients.
* It also generates the metrics and logging configuration. If this operand doesn't support logging
* or metrics, they will not be set.
*
* @param metricsAndLogging The external CMs with logging and metrics configuration
*
* @return The generated ConfigMap
*/
public ConfigMap generateMetricsAndLogConfigMap(MetricsAndLogging metricsAndLogging) {
public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) {
// generate the ConfigMap data entries for the metrics and logging configuration
Map<String, String> data = ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging);
// add the ConfigMap data entry for the bridge HTTP and Kafka clients related configuration
data.put(
BRIDGE_CONFIGURATION_FILENAME,
new KafkaBridgeConfigurationBuilder(reconciliation, cluster, bootstrapServers)
.withConfigProviders()
.withTracing(tracing)
.withTls(tls)
.withAuthentication(authentication)
.withKafkaAdminClient(kafkaBridgeAdminClient)
.withKafkaProducer(kafkaBridgeProducer)
.withKafkaConsumer(kafkaBridgeConsumer)
.withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer)
.build()
);

return ConfigMapUtils
.createConfigMap(
KafkaBridgeResources.metricsAndLogConfigMapName(cluster),
namespace,
labels,
ownerReference,
ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging)
data
);
}

Expand Down
Loading

0 comments on commit de427e5

Please sign in to comment.