Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved bridge configuration setup within the operator #11032

Merged
merged 6 commits into from
Jan 24, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Enabling user to specify config providers
Temporary charts related changes to run STs

Signed-off-by: Paolo Patierno <ppatierno@live.com>
ppatierno committed Jan 24, 2025
commit 448d8d376a99fa084586d0ae5363760f4a6c7b45
Original file line number Diff line number Diff line change
@@ -563,7 +563,6 @@ public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) {
data.put(
BRIDGE_CONFIGURATION_FILENAME,
new KafkaBridgeConfigurationBuilder(reconciliation, cluster, bootstrapServers)
.withConfigProviders()
.withTracing(tracing)
.withTls(tls)
.withAuthentication(authentication)
Original file line number Diff line number Diff line change
@@ -36,11 +36,6 @@ public class KafkaBridgeConfigurationBuilder {
private static final String PASSWORD_VOLUME_MOUNT = "/opt/strimzi/bridge-password/";
// the SASL password file template includes: <volume_mount>/<secret_name>/<password_file>
private static final String PLACEHOLDER_SASL_PASSWORD_FILE_TEMPLATE_CONFIG_PROVIDER_DIR = "${strimzidir:%s%s:%s}";
private static final String PLACEHOLDER_OAUTH_CONFIG_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_CONFIG}";
private static final String PLACEHOLDER_OAUTH_ACCESS_TOKEN_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_ACCESS_TOKEN}";
private static final String PLACEHOLDER_OAUTH_REFRESH_TOKEN_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_REFRESH_TOKEN}";
private static final String PLACEHOLDER_OAUTH_CLIENT_SECRET_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_CLIENT_SECRET}";
private static final String PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:KAFKA_BRIDGE_OAUTH_PASSWORD_GRANT_PASSWORD}";

private final Reconciliation reconciliation;
private final StringWriter stringWriter = new StringWriter();
@@ -94,25 +89,6 @@ private void configureSecurityProtocol() {
writer.println("kafka.security.protocol=" + securityProtocol);
}

/**
* Configures the Kafka config providers used for loading some parameters from env vars and files
* (i.e. user and password for authentication)
*
* @return the builder instance
*/
public KafkaBridgeConfigurationBuilder withConfigProviders() {
printSectionHeader("Config providers");
writer.println("kafka.config.providers=strimzienv,strimzifile,strimzidir");
writer.println("kafka.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider");
writer.println("kafka.config.providers.strimzienv.param.allowlist.pattern=.*");
writer.println("kafka.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider");
writer.println("kafka.config.providers.strimzifile.param.allowed.paths=/opt/strimzi");
writer.println("kafka.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider");
writer.println("kafka.config.providers.strimzidir.param.allowed.paths=/opt/strimzi");
writer.println();
return this;
}

/**
* Adds the tracing type
*
@@ -185,22 +161,22 @@ public KafkaBridgeConfigurationBuilder withAuthentication(KafkaClientAuthenticat
jaasConfig.append("org.apache.kafka.common.security.scram.ScramLoginModule required username=" + PLACEHOLDER_SASL_USERNAME_CONFIG_PROVIDER_ENV_VAR + " password=" + passwordFilePath + ";");
} else if (authentication instanceof KafkaClientAuthenticationOAuth oauth) {
saslMechanism = "OAUTHBEARER";
jaasConfig.append("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + PLACEHOLDER_OAUTH_CONFIG_CONFIG_PROVIDER_ENV_VAR);
jaasConfig.append("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${strimzienv:KAFKA_BRIDGE_OAUTH_CONFIG}");

if (oauth.getClientSecret() != null) {
jaasConfig.append(" oauth.client.secret=" + PLACEHOLDER_OAUTH_CLIENT_SECRET_CONFIG_PROVIDER_ENV_VAR);
jaasConfig.append(" oauth.client.secret=${strimzienv:KAFKA_BRIDGE_OAUTH_CLIENT_SECRET}");
}

if (oauth.getRefreshToken() != null) {
jaasConfig.append(" oauth.refresh.token=" + PLACEHOLDER_OAUTH_REFRESH_TOKEN_CONFIG_PROVIDER_ENV_VAR);
jaasConfig.append(" oauth.refresh.token=${strimzienv:KAFKA_BRIDGE_OAUTH_REFRESH_TOKEN}");
}

if (oauth.getAccessToken() != null) {
jaasConfig.append(" oauth.access.token=" + PLACEHOLDER_OAUTH_ACCESS_TOKEN_CONFIG_PROVIDER_ENV_VAR);
jaasConfig.append(" oauth.access.token=${strimzienv:KAFKA_BRIDGE_OAUTH_ACCESS_TOKEN}");
}

if (oauth.getPasswordSecret() != null) {
jaasConfig.append(" oauth.password.grant.password=" + PLACEHOLDER_OAUTH_PASSWORD_GRANT_PASSWORD_CONFIG_PROVIDER_ENV_VAR);
jaasConfig.append(" oauth.password.grant.password=${strimzienv:KAFKA_BRIDGE_OAUTH_PASSWORD_GRANT_PASSWORD}");
}

if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) {
@@ -218,19 +194,50 @@ public KafkaBridgeConfigurationBuilder withAuthentication(KafkaClientAuthenticat
return this;
}

/**
* Configures the Kafka configuration providers
*
* @param userConfig the user configuration, for a specific bridge Kafka client (admin, producer or consumer)
* to extract the possible user-provided config provider configuration from it
* @param prefix prefix for the bridge Kafka client to be configured. It could be "kafka.admin", "kafka.producer" or "kafka.consumer".
*/
private void configProvider(AbstractConfiguration userConfig, String prefix) {
printSectionHeader("Config providers");
String strimziConfigProviders = "strimzienv,strimzifile,strimzidir";
// configure user provided config providers together with the Strimzi ones ...
if (userConfig != null
&& !userConfig.getConfiguration().isEmpty()
&& userConfig.getConfigOption("config.providers") != null) {
writer.println(prefix + ".config.providers=" + userConfig.getConfigOption("config.providers") + "," + strimziConfigProviders);
userConfig.removeConfigOption("config.providers");
// ... or configure only the Strimzi config providers
} else {
writer.println(prefix + ".config.providers=" + strimziConfigProviders);
}
writer.println(prefix + ".config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider");
writer.println(prefix + ".config.providers.strimzienv.param.allowlist.pattern=.*");
writer.println(prefix + ".config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider");
writer.println(prefix + ".config.providers.strimzifile.param.allowed.paths=/opt/strimzi");
writer.println(prefix + ".config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider");
writer.println(prefix + ".config.providers.strimzidir.param.allowed.paths=/opt/strimzi");
}

/**
* Adds the bridge Kafka admin client specific configuration
*
* @param kafkaBridgeAdminClient the Kafka admin client configuration
* @return the builder instance
*/
public KafkaBridgeConfigurationBuilder withKafkaAdminClient(KafkaBridgeAdminClientSpec kafkaBridgeAdminClient) {
if (kafkaBridgeAdminClient != null) {
KafkaBridgeAdminClientConfiguration config = new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet());
printSectionHeader("Apache Kafka AdminClient");
printSectionHeader("Apache Kafka AdminClient");
KafkaBridgeAdminClientConfiguration config = kafkaBridgeAdminClient != null ?
new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet()) :
null;
configProvider(config, "kafka.admin");
if (config != null) {
config.asOrderedProperties().asMap().forEach((key, value) -> writer.println("kafka.admin." + key + "=" + value));
writer.println();
}
writer.println();
return this;
}

@@ -241,12 +248,15 @@ public KafkaBridgeConfigurationBuilder withKafkaAdminClient(KafkaBridgeAdminClie
* @return the builder instance
*/
public KafkaBridgeConfigurationBuilder withKafkaProducer(KafkaBridgeProducerSpec kafkaBridgeProducer) {
if (kafkaBridgeProducer != null) {
KafkaBridgeProducerConfiguration config = new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet());
printSectionHeader("Apache Kafka Producer");
printSectionHeader("Apache Kafka Producer");
KafkaBridgeProducerConfiguration config = kafkaBridgeProducer != null ?
new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet()) :
null;
configProvider(config, "kafka.producer");
if (config != null) {
config.asOrderedProperties().asMap().forEach((key, value) -> writer.println("kafka.producer." + key + "=" + value));
writer.println();
}
writer.println();
return this;
}

@@ -257,13 +267,16 @@ public KafkaBridgeConfigurationBuilder withKafkaProducer(KafkaBridgeProducerSpec
* @return the builder instance
*/
public KafkaBridgeConfigurationBuilder withKafkaConsumer(KafkaBridgeConsumerSpec kafkaBridgeConsumer) {
if (kafkaBridgeConsumer != null) {
KafkaBridgeConsumerConfiguration config = new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet());
printSectionHeader("Apache Kafka Consumer");
printSectionHeader("Apache Kafka Consumer");
KafkaBridgeConsumerConfiguration config = kafkaBridgeConsumer != null ?
new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet()) :
null;
configProvider(config, "kafka.consumer");
if (config != null) {
config.asOrderedProperties().asMap().forEach((key, value) -> writer.println("kafka.consumer." + key + "=" + value));
writer.println("kafka.consumer.client.rack=${strimzidir:/opt/strimzi/init:rack.id}");
writer.println();
}
writer.println();
return this;
}

Original file line number Diff line number Diff line change
@@ -57,19 +57,35 @@ public void testBaseConfiguration() {
public void testConfigProviders() {
// test config providers setting
String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS)
.withConfigProviders()
.withKafkaAdminClient(null)
.withKafkaProducer(null)
.withKafkaConsumer(null)
.build();
assertThat(configuration, isEquivalent(
"bridge.id=my-bridge",
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
"kafka.security.protocol=PLAINTEXT",
"kafka.config.providers=strimzienv,strimzifile,strimzidir",
"kafka.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.config.providers.strimzidir.param.allowed.paths=/opt/strimzi"
"kafka.admin.config.providers=strimzienv,strimzifile,strimzidir",
"kafka.admin.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.admin.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.admin.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.admin.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.admin.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.admin.config.providers.strimzidir.param.allowed.paths=/opt/strimzi",
"kafka.producer.config.providers=strimzienv,strimzifile,strimzidir",
"kafka.producer.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.producer.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.producer.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.producer.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.producer.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.producer.config.providers.strimzidir.param.allowed.paths=/opt/strimzi",
"kafka.consumer.config.providers=strimzienv,strimzifile,strimzidir",
"kafka.consumer.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.consumer.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.consumer.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.consumer.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.consumer.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.consumer.config.providers.strimzidir.param.allowed.paths=/opt/strimzi"
));
}

@@ -288,7 +304,47 @@ public void testKafkaProducer() {
"kafka.producer.acks=1",
"kafka.producer.linger.ms=100",
"kafka.producer.key.serializer=my-producer-key-serializer",
"kafka.producer.value.serializer=my-producer-value-serializer"
"kafka.producer.value.serializer=my-producer-value-serializer",
"kafka.producer.config.providers=strimzienv,strimzifile,strimzidir",
"kafka.producer.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.producer.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.producer.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.producer.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.producer.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.producer.config.providers.strimzidir.param.allowed.paths=/opt/strimzi"
));

// Kafka Producer with config providers
kafkaBridgeProducer = new KafkaBridgeProducerSpecBuilder()
.withConfig(
Map.of(
"acks", 1,
"linger.ms", 100,
"key.serializer", "my-producer-key-serializer",
"value.serializer", "my-producer-value-serializer",
"config.providers", "env",
"config.providers.env.class", "org.apache.kafka.common.config.provider.EnvVarConfigProvider"
))
.build();
configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS)
.withKafkaProducer(kafkaBridgeProducer)
.build();
assertThat(configuration, isEquivalent(
"bridge.id=my-bridge",
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
"kafka.security.protocol=PLAINTEXT",
"kafka.producer.acks=1",
"kafka.producer.linger.ms=100",
"kafka.producer.key.serializer=my-producer-key-serializer",
"kafka.producer.value.serializer=my-producer-value-serializer",
"kafka.producer.config.providers=env,strimzienv,strimzifile,strimzidir",
"kafka.producer.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.producer.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.producer.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.producer.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.producer.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.producer.config.providers.strimzidir.param.allowed.paths=/opt/strimzi",
"kafka.producer.config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"
));
}

@@ -318,7 +374,46 @@ public void testKafkaConsumer() {
"kafka.consumer.auto.offset.reset=earliest",
"kafka.consumer.key.deserializer=my-consumer-key-deserializer",
"kafka.consumer.value.deserializer=my-consumer-value-deserializer",
"kafka.consumer.client.rack=${strimzidir:/opt/strimzi/init:rack.id}"
"kafka.consumer.client.rack=${strimzidir:/opt/strimzi/init:rack.id}",
"kafka.consumer.config.providers=strimzienv,strimzifile,strimzidir",
"kafka.consumer.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.consumer.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.consumer.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.consumer.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.consumer.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.consumer.config.providers.strimzidir.param.allowed.paths=/opt/strimzi"
));

// Kafka Consumer with config providers
kafkaBridgeConsumer = new KafkaBridgeConsumerSpecBuilder()
.withConfig(
Map.of(
"auto.offset.reset", "earliest",
"key.deserializer", "my-consumer-key-deserializer",
"value.deserializer", "my-consumer-value-deserializer",
"config.providers", "env",
"config.providers.env.class", "org.apache.kafka.common.config.provider.EnvVarConfigProvider"
))
.build();
configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS)
.withKafkaConsumer(kafkaBridgeConsumer)
.build();
assertThat(configuration, isEquivalent(
"bridge.id=my-bridge",
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
"kafka.security.protocol=PLAINTEXT",
"kafka.consumer.auto.offset.reset=earliest",
"kafka.consumer.key.deserializer=my-consumer-key-deserializer",
"kafka.consumer.value.deserializer=my-consumer-value-deserializer",
"kafka.consumer.client.rack=${strimzidir:/opt/strimzi/init:rack.id}",
"kafka.consumer.config.providers=env,strimzienv,strimzifile,strimzidir",
"kafka.consumer.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.consumer.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.consumer.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.consumer.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.consumer.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.consumer.config.providers.strimzidir.param.allowed.paths=/opt/strimzi",
"kafka.consumer.config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"
));
}

@@ -345,7 +440,43 @@ public void testKafkaAdminClient() {
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
"kafka.security.protocol=PLAINTEXT",
"kafka.admin.client.id=my-admin-client",
"kafka.admin.bootstrap.controllers=my-bootstrap-controllers"
"kafka.admin.bootstrap.controllers=my-bootstrap-controllers",
"kafka.admin.config.providers=strimzienv,strimzifile,strimzidir",
"kafka.admin.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.admin.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.admin.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.admin.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.admin.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.admin.config.providers.strimzidir.param.allowed.paths=/opt/strimzi"
));

// Kafka Admin with config providers
kafkaBridgeAdminClient = new KafkaBridgeAdminClientSpecBuilder()
.withConfig(
Map.of(
"client.id", "my-admin-client",
"bootstrap.controllers", "my-bootstrap-controllers",
"config.providers", "env",
"config.providers.env.class", "org.apache.kafka.common.config.provider.EnvVarConfigProvider"
))
.build();
configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS)
.withKafkaAdminClient(kafkaBridgeAdminClient)
.build();
assertThat(configuration, isEquivalent(
"bridge.id=my-bridge",
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
"kafka.security.protocol=PLAINTEXT",
"kafka.admin.client.id=my-admin-client",
"kafka.admin.bootstrap.controllers=my-bootstrap-controllers",
"kafka.admin.config.providers=env,strimzienv,strimzifile,strimzidir",
"kafka.admin.config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
"kafka.admin.config.providers.strimzienv.param.allowlist.pattern=.*",
"kafka.admin.config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
"kafka.admin.config.providers.strimzifile.param.allowed.paths=/opt/strimzi",
"kafka.admin.config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
"kafka.admin.config.providers.strimzidir.param.allowed.paths=/opt/strimzi",
"kafka.admin.config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"
));
}

Original file line number Diff line number Diff line change
@@ -169,7 +169,7 @@ public void testCreateOrUpdateCreatesCluster(VertxTestContext context) {
assertThat(dc.getMetadata().getName(), is(bridge.getComponentName()));
assertThat(dc, is(bridge.generateDeployment(Map.of(
Annotations.ANNO_STRIMZI_AUTH_HASH, "0",
KafkaBridgeCluster.ANNO_STRIMZI_IO_CONFIGURATION_HASH, "0ff4f460"
KafkaBridgeCluster.ANNO_STRIMZI_IO_CONFIGURATION_HASH, "fe8e7089"
), true, null, null)));

// Verify PodDisruptionBudget
@@ -340,7 +340,7 @@ public void testCreateOrUpdateUpdatesCluster(VertxTestContext context) {
assertThat(dc.getMetadata().getName(), is(compareTo.getComponentName()));
assertThat(dc, is(compareTo.generateDeployment(Map.of(
Annotations.ANNO_STRIMZI_AUTH_HASH, "0",
KafkaBridgeCluster.ANNO_STRIMZI_IO_CONFIGURATION_HASH, "0ff4f460"
KafkaBridgeCluster.ANNO_STRIMZI_IO_CONFIGURATION_HASH, "fe8e7089"
), true, null, null)));

// Verify PodDisruptionBudget
Original file line number Diff line number Diff line change
@@ -13,10 +13,10 @@ defaultImageRepository: strimzi
defaultImageTag: latest

image:
registry: ""
repository: ""
registry: "quay.io"
repository: "ppatierno"
name: operator
tag: ""
tag: "bridge-config"
# imagePullSecrets:
# - name: secretname
logVolume: co-config-volume
Original file line number Diff line number Diff line change
@@ -62,11 +62,13 @@ private void clusterOperator(long operationTimeout, long reconciliationInterval,

// image repository config
values.put("defaultImageRepository", Environment.getIfNotEmptyOrDefault(Environment.STRIMZI_ORG, Environment.STRIMZI_ORG_DEFAULT));
values.put("kafkaBridge.image.repository", Environment.STRIMZI_ORG_DEFAULT);
//values.put("kafkaBridge.image.repository", Environment.STRIMZI_ORG_DEFAULT);
values.put("kafkaBridge.image.repository", "ppatierno");

// image tags config
values.put("defaultImageTag", Environment.getIfNotEmptyOrDefault(Environment.STRIMZI_TAG, Environment.STRIMZI_TAG_DEFAULT));
values.put("kafkaBridge.image.tag", BridgeUtils.getBridgeVersion());
//values.put("kafkaBridge.image.tag", BridgeUtils.getBridgeVersion());
values.put("kafkaBridge.image.tag", "bridge-config");

// Additional config
values.put("image.imagePullPolicy", Environment.OPERATOR_IMAGE_PULL_POLICY);