diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java
index 14c2dd49b6d..c7c7a7f4bd0 100644
--- a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java
+++ b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java
@@ -27,82 +27,12 @@
 
 /**
  * Topic Operator configuration.
- *
- * @param namespace                             The namespace that the operator will watch for KafkaTopics.
- * @param labelSelector                         The label selector that KafkaTopics must match.
- * @param bootstrapServers                      The Kafka bootstrap servers.
- * @param clientId                              The client Id to use for the Admin client.
- * @param fullReconciliationIntervalMs          The periodic reconciliation interval in milliseconds.
- * @param tlsEnabled                            Whether the Admin client should be configured to use TLS.
- * @param truststoreLocation                    The location (path) of the Admin client's truststore.
- * @param truststorePassword                    The password for the truststore at {@code truststoreLocation}.
- * @param keystoreLocation                      The location (path) of the Admin client's keystore.
- * @param keystorePassword                      The password for the keystore at {@code keystoreLocation}.
- * @param sslEndpointIdentificationAlgorithm    The SSL endpoint identification algorithm.
- * @param saslEnabled                           Whether the Admin client should be configured to use SASL.
- * @param saslMechanism                         The SASL mechanism for the Admin client.
- * @param saslCustomConfigJson                  The SASL custom values for the Admin client when using alternate auth mechanisms.
- * @param saslUsername                          The SASL username for the Admin client.
- * @param saslPassword                          The SASL password for the Admin client.
- * @param securityProtocol                      The security protocol for the Admin client.
- * @param useFinalizer                          Whether to use finalizers.
- * @param maxQueueSize                          The capacity of the queue.
- * @param maxBatchSize                          The maximum size of a reconciliation batch.
- * @param maxBatchLingerMs                      The maximum time to wait for a reconciliation batch to contain {@code maxBatchSize} items.
- * @param enableAdditionalMetrics               Whether to enable additional metrics.
- * @param featureGates                          Configured feature gates.
- * @param cruiseControlEnabled                  Whether Cruise Control integration is enabled.
- * @param cruiseControlRackEnabled              Whether the target Kafka cluster has rack awareness.
- * @param cruiseControlHostname                 Cruise Control hostname.
- * @param cruiseControlPort                     Cruise Control port.
- * @param cruiseControlSslEnabled               Whether Cruise Control SSL encryption is enabled.
- * @param cruiseControlAuthEnabled              Whether Cruise Control Basic authentication is enabled.
- * @param cruiseControlCrtFilePath              Certificate chain to be trusted.
- * @param cruiseControlApiUserPath              Api admin username file path.
- * @param cruiseControlApiPassPath              Api admin password file path.
- * @param alterableTopicConfig                  Comma separated list of the alterable Kafka topic properties.
- * @param skipClusterConfigReview               For some managed Kafka services the Cluster config is not callable, so this skips those calls.
  */
-public record TopicOperatorConfig(
-        String namespace,
-        Labels labelSelector,
-        String bootstrapServers,
-        String clientId,
-        long fullReconciliationIntervalMs,
-        boolean tlsEnabled,
-        String truststoreLocation,
-        String truststorePassword,
-        String keystoreLocation,
-        String keystorePassword,
-        String sslEndpointIdentificationAlgorithm,
-        boolean saslEnabled,
-        String saslMechanism,
-        String saslCustomConfigJson,
-        String saslUsername,
-        String saslPassword,
-        String securityProtocol,
-        boolean useFinalizer,
-        int maxQueueSize,
-        int maxBatchSize,
-        long maxBatchLingerMs,
-        boolean enableAdditionalMetrics,
-        FeatureGates featureGates,
-        boolean cruiseControlEnabled,
-        boolean cruiseControlRackEnabled,
-        String cruiseControlHostname,
-        int cruiseControlPort,
-        boolean cruiseControlSslEnabled,
-        boolean cruiseControlAuthEnabled,
-        String cruiseControlCrtFilePath,
-        String cruiseControlApiUserPath,
-        String cruiseControlApiPassPath,
-        String alterableTopicConfig,
-        boolean skipClusterConfigReview
-) {
+public class TopicOperatorConfig {
     private final static ReconciliationLogger LOGGER = ReconciliationLogger.create(TopicOperatorConfig.class);
     private static final Map<String, ConfigParameter<?>> CONFIG_VALUES = new HashMap<>();
     private static final TypeReference<HashMap<String, String>> STRING_HASH_MAP_TYPE_REFERENCE = new TypeReference<>() { };
-    
+
     /** Namespace in which the operator will run and create resources. */
     public static final ConfigParameter<String> NAMESPACE = new ConfigParameter<>("STRIMZI_NAMESPACE", ConfigParameterParser.NON_EMPTY_STRING, CONFIG_VALUES);
     /** Labels used to filter the custom resources seen by the cluster operator. */
@@ -158,7 +88,7 @@ public record TopicOperatorConfig(
     /** Cruise Control: whether rack awareness is enabled. */
     public static final ConfigParameter<Boolean> CRUISE_CONTROL_RACK_ENABLED = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_RACK_ENABLED", ConfigParameterParser.BOOLEAN, "false", CONFIG_VALUES);
     /** Cruise Control: server hostname. */
-    public static final ConfigParameter<String> CRUISE_CONTROL_HOSTNAME = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_HOSTNAME", ConfigParameterParser.STRING, "127.0.0.1", CONFIG_VALUES);
+    public static final ConfigParameter<String> CRUISE_CONTROL_HOSTNAME = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_HOSTNAME", ConfigParameterParser.STRING, "localhost", CONFIG_VALUES);
     /** Cruise Control: server port. */
     public static final ConfigParameter<Integer> CRUISE_CONTROL_PORT = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_PORT", ConfigParameterParser.strictlyPositive(ConfigParameterParser.INTEGER), "9090", CONFIG_VALUES);
     /** Cruise Control: whether rack awareness is enabled. */
@@ -172,116 +102,251 @@ public record TopicOperatorConfig(
     /** Cruise Control: password file location. */
     public static final ConfigParameter<String> CRUISE_CONTROL_API_PASS_PATH = new ConfigParameter<>("STRIMZI_CRUISE_CONTROL_API_PASS_PATH", ConfigParameterParser.STRING, "/etc/eto-cc-api/" + CruiseControlApiProperties.TOPIC_OPERATOR_PASSWORD_KEY, CONFIG_VALUES);
 
-    @SuppressWarnings("unchecked")
-    private static <T> T get(Map<String, Object> map, ConfigParameter<T> value) {
-        return (T) map.get(value.key());
-    }
+    private final Map<String, Object> map;
 
-    static Set<String> keyNames() {
-        return Collections.unmodifiableSet(CONFIG_VALUES.keySet());
+    /**
+     * Constructor.
+     *
+     * @param map Map containing configurations and their respective values.
+     */
+    private TopicOperatorConfig(Map<String, Object> map) {
+        this.map = map;
     }
 
     /**
-     * Creates the TopicOperator configuration from a map.
-     * 
+     * Creates the Topic Operator configuration from a map.
+     *
      * @param map Configuration map.
-     * @return TopicOperator config.
+     * @return Topic Operator configuration.
      */
     public static TopicOperatorConfig buildFromMap(Map<String, String> map) {
         Map<String, String> envMap = new HashMap<>(map);
         envMap.keySet().retainAll(TopicOperatorConfig.keyNames());
-
         Map<String, Object> generatedMap = ConfigParameter.define(envMap, CONFIG_VALUES);
-
         TopicOperatorConfig topicOperatorConfig = new TopicOperatorConfig(generatedMap);
         LOGGER.infoOp("TopicOperator configuration is {}", topicOperatorConfig);
         return topicOperatorConfig;
     }
 
-    /**
-     * Creates the TopicOperator configuration.
-     * 
-     * @param map Configuration map.
-     */
-    public TopicOperatorConfig(Map<String, Object> map) {
-        this(
-                get(map, NAMESPACE),
-                get(map, RESOURCE_LABELS),
-                get(map, BOOTSTRAP_SERVERS),
-                get(map, CLIENT_ID),
-                get(map, FULL_RECONCILIATION_INTERVAL_MS),
-                get(map, TLS_ENABLED),
-                get(map, TRUSTSTORE_LOCATION),
-                get(map, TRUSTSTORE_PASSWORD),
-                get(map, KEYSTORE_LOCATION),
-                get(map, KEYSTORE_PASSWORD),
-                get(map, SSL_ENDPOINT_IDENTIFICATION_ALGORITHM),
-                get(map, SASL_ENABLED),
-                get(map, SASL_MECHANISM),
-                get(map, SASL_CUSTOM_CONFIG_JSON),
-                get(map, SASL_USERNAME),
-                get(map, SASL_PASSWORD),
-                get(map, SECURITY_PROTOCOL),
-                get(map, USE_FINALIZERS),
-                get(map, MAX_QUEUE_SIZE),
-                get(map, MAX_BATCH_SIZE),
-                get(map, MAX_BATCH_LINGER_MS),
-                get(map, ENABLE_ADDITIONAL_METRICS),
-                get(map, FEATURE_GATES),
-                get(map, CRUISE_CONTROL_ENABLED),
-                get(map, CRUISE_CONTROL_RACK_ENABLED),
-                get(map, CRUISE_CONTROL_HOSTNAME),
-                get(map, CRUISE_CONTROL_PORT),
-                get(map, CRUISE_CONTROL_SSL_ENABLED),
-                get(map, CRUISE_CONTROL_AUTH_ENABLED),
-                get(map, CRUISE_CONTROL_CRT_FILE_PATH),
-                get(map, CRUISE_CONTROL_API_USER_PATH),
-                get(map, CRUISE_CONTROL_API_PASS_PATH),
-                get(map, ALTERABLE_TOPIC_CONFIG),
-                get(map, SKIP_CLUSTER_CONFIG_REVIEW)
-        );
+    private static Set<String> keyNames() {
+        return Collections.unmodifiableSet(CONFIG_VALUES.keySet());
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T get(ConfigParameter<T> value) {
+        return (T) map.get(value.key());
+    }
+
+    /** @return Value of {@link #NAMESPACE} configuration. */
+    public String namespace() {
+        return get(NAMESPACE);
+    }
+
+    /** @return Value of {@link #RESOURCE_LABELS} configuration. */
+    public Labels resourceLabels() {
+        return get(RESOURCE_LABELS);
+    }
+
+    /** @return Value of {@link #BOOTSTRAP_SERVERS} configuration. */
+    public String bootstrapServers() {
+        return get(BOOTSTRAP_SERVERS);
+    }
+
+    /** @return Value of {@link #CLIENT_ID} configuration. */
+    public String clientId() {
+        return get(CLIENT_ID);
+    }
+
+    /** @return Value of {@link #FULL_RECONCILIATION_INTERVAL_MS} configuration. */
+    public long fullReconciliationIntervalMs() {
+        return get(FULL_RECONCILIATION_INTERVAL_MS);
+    }
+
+    /** @return Value of {@link #TLS_ENABLED} configuration. */
+    public boolean tlsEnabled() {
+        return get(TLS_ENABLED);
+    }
+
+    /** @return Value of {@link #TRUSTSTORE_LOCATION} configuration. */
+    public String truststoreLocation() {
+        return get(TRUSTSTORE_LOCATION);
+    }
+
+    /** @return Value of {@link #TRUSTSTORE_PASSWORD} configuration. */
+    public String truststorePassword() {
+        return get(TRUSTSTORE_PASSWORD);
+    }
+
+    /** @return Value of {@link #KEYSTORE_LOCATION} configuration. */
+    public String keystoreLocation() {
+        return get(KEYSTORE_LOCATION);
+    }
+
+    /** @return Value of {@link #KEYSTORE_PASSWORD} configuration. */
+    public String keystorePassword() {
+        return get(KEYSTORE_PASSWORD);
+    }
+
+    /** @return Value of {@link #SSL_ENDPOINT_IDENTIFICATION_ALGORITHM} configuration. */
+    public String sslEndpointIdentificationAlgorithm() {
+        return get(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
+    }
+
+    /** @return Value of {@link #SASL_ENABLED} configuration. */
+    public boolean saslEnabled() {
+        return get(SASL_ENABLED);
+    }
+
+    /** @return Value of {@link #SASL_MECHANISM} configuration. */
+    public String saslMechanism() {
+        return get(SASL_MECHANISM);
+    }
+
+    /** @return Value of {@link #SASL_CUSTOM_CONFIG_JSON} configuration. */
+    public String saslCustomConfigJson() {
+        return get(SASL_CUSTOM_CONFIG_JSON);
+    }
+
+    /** @return Value of {@link #SASL_USERNAME} configuration. */
+    public String saslUsername() {
+        return get(SASL_USERNAME);
+    }
+
+    /** @return Value of {@link #SASL_PASSWORD} configuration. */
+    public String saslPassword() {
+        return get(SASL_PASSWORD);
+    }
+
+    /** @return Value of {@link #SECURITY_PROTOCOL} configuration. */
+    public String securityProtocol() {
+        return get(SECURITY_PROTOCOL);
+    }
+
+    /** @return Value of {@link #USE_FINALIZERS} configuration. */
+    public boolean useFinalizer() {
+        return get(USE_FINALIZERS);
+    }
+
+    /** @return Value of {@link #MAX_QUEUE_SIZE} configuration. */
+    public int maxQueueSize() {
+        return get(MAX_QUEUE_SIZE);
+    }
+
+    /** @return Value of {@link #MAX_BATCH_SIZE} configuration. */
+    public int maxBatchSize() {
+        return get(MAX_BATCH_SIZE);
+    }
+
+    /** @return Value of {@link #MAX_BATCH_LINGER_MS} configuration. */
+    public long maxBatchLingerMs() {
+        return get(MAX_BATCH_LINGER_MS);
+    }
+
+    /** @return Value of {@link #ENABLE_ADDITIONAL_METRICS} configuration. */
+    public boolean enableAdditionalMetrics() {
+        return get(ENABLE_ADDITIONAL_METRICS);
+    }
+
+    /** @return Value of {@link #ALTERABLE_TOPIC_CONFIG} configuration. */
+    public String alterableTopicConfig() {
+        return get(ALTERABLE_TOPIC_CONFIG);
+    }
+
+    /** @return Value of {@link #SKIP_CLUSTER_CONFIG_REVIEW} configuration. */
+    public boolean skipClusterConfigReview() {
+        return get(SKIP_CLUSTER_CONFIG_REVIEW);
+    }
+
+    /** @return Value of {@link #FEATURE_GATES} configuration. */
+    public FeatureGates featureGates() {
+        return get(FEATURE_GATES);
+    }
+
+    /** @return Value of {@link #CRUISE_CONTROL_ENABLED} configuration. */
+    public boolean cruiseControlEnabled() {
+        return get(CRUISE_CONTROL_ENABLED);
+    }
+
+    /** @return Value of {@link #CRUISE_CONTROL_RACK_ENABLED} configuration. */
+    public boolean cruiseControlRackEnabled() {
+        return get(CRUISE_CONTROL_RACK_ENABLED);
+    }
+
+    /** @return Value of {@link #CRUISE_CONTROL_HOSTNAME} configuration. */
+    public String cruiseControlHostname() {
+        return get(CRUISE_CONTROL_HOSTNAME);
+    }
+
+    /** @return Value of {@link #CRUISE_CONTROL_PORT} configuration. */
+    public int cruiseControlPort() {
+        return get(CRUISE_CONTROL_PORT);
+    }
+
+    /** @return Value of {@link #CRUISE_CONTROL_SSL_ENABLED} configuration. */
+    public boolean cruiseControlSslEnabled() {
+        return get(CRUISE_CONTROL_SSL_ENABLED);
+    }
+
+    /** @return Value of {@link #CRUISE_CONTROL_AUTH_ENABLED} configuration. */
+    public boolean cruiseControlAuthEnabled() {
+        return get(CRUISE_CONTROL_AUTH_ENABLED);
+    }
+
+    /** @return Value of {@link #CRUISE_CONTROL_CRT_FILE_PATH} configuration. */
+    public String cruiseControlCrtFilePath() {
+        return get(CRUISE_CONTROL_CRT_FILE_PATH);
+    }
+
+    /** @return Value of {@link #CRUISE_CONTROL_API_USER_PATH} configuration. */
+    public String cruiseControlApiUserPath() {
+        return get(CRUISE_CONTROL_API_USER_PATH);
+    }
+
+    /** @return Value of {@link #CRUISE_CONTROL_API_PASS_PATH} configuration. */
+    public String cruiseControlApiPassPath() {
+        return get(CRUISE_CONTROL_API_PASS_PATH);
     }
 
     Map<String, Object> adminClientConfig() {
         var kafkaClientProps = new HashMap<String, Object>();
-        kafkaClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers());
-        kafkaClientProps.put(AdminClientConfig.CLIENT_ID_CONFIG, this.clientId());
+        kafkaClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        kafkaClientProps.put(AdminClientConfig.CLIENT_ID_CONFIG, clientId());
 
-        if (this.tlsEnabled() && !this.securityProtocol().isEmpty()) {
-            if (!this.securityProtocol().equals("SSL") && !this.securityProtocol().equals("SASL_SSL")) {
+        if (tlsEnabled() && !securityProtocol().isEmpty()) {
+            if (!securityProtocol().equals("SSL") && !securityProtocol().equals("SASL_SSL")) {
                 throw new InvalidConfigurationException("TLS is enabled but the security protocol does not match SSL or SASL_SSL");
             }
         }
 
-        if (!this.securityProtocol().isEmpty()) {
-            kafkaClientProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, this.securityProtocol());
-        } else if (this.tlsEnabled()) {
+        if (!securityProtocol().isEmpty()) {
+            kafkaClientProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol());
+        } else if (tlsEnabled()) {
             kafkaClientProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL");
         } else {
             kafkaClientProps.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
         }
 
-        if (this.securityProtocol().equals("SASL_SSL") || this.securityProtocol().equals("SSL") || this.tlsEnabled()) {
-            kafkaClientProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, this.sslEndpointIdentificationAlgorithm());
+        if (securityProtocol().equals("SASL_SSL") || securityProtocol().equals("SSL") || tlsEnabled()) {
+            kafkaClientProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, sslEndpointIdentificationAlgorithm());
 
-            if (!this.truststoreLocation().isEmpty()) {
-                kafkaClientProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.truststoreLocation());
+            if (!truststoreLocation().isEmpty()) {
+                kafkaClientProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation());
             }
 
-            if (!this.truststorePassword().isEmpty()) {
-                if (this.truststoreLocation().isEmpty()) {
+            if (!truststorePassword().isEmpty()) {
+                if (truststoreLocation().isEmpty()) {
                     throw new InvalidConfigurationException("TLS_TRUSTSTORE_PASSWORD was supplied but TLS_TRUSTSTORE_LOCATION was not supplied");
                 }
-                kafkaClientProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.truststorePassword());
+                kafkaClientProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword());
             }
 
-            if (!this.keystoreLocation().isEmpty() && !this.keystorePassword().isEmpty()) {
-                kafkaClientProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.keystoreLocation());
-                kafkaClientProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.keystorePassword());
+            if (!keystoreLocation().isEmpty() && !keystorePassword().isEmpty()) {
+                kafkaClientProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation());
+                kafkaClientProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword());
             }
         }
 
-        if (this.saslEnabled()) {
+        if (saslEnabled()) {
             putSaslConfigs(kafkaClientProps);
         }
 
@@ -289,10 +354,7 @@ Map<String, Object> adminClientConfig() {
     }
 
     private void putSaslConfigs(Map<String, Object> kafkaClientProps) {
-        TopicOperatorConfig config = this;
-        String customSaslConfigJson = config.saslCustomConfigJson();
-
-        if (customSaslConfigJson.isBlank()) {
+        if (saslCustomConfigJson().isBlank()) {
             setStandardSaslConfigs(kafkaClientProps);
         } else {
             setCustomSaslConfigs(kafkaClientProps);
@@ -300,10 +362,7 @@ private void putSaslConfigs(Map<String, Object> kafkaClientProps) {
     }
 
     private void setCustomSaslConfigs(Map<String, Object> kafkaClientProps) {
-        TopicOperatorConfig config = this;
-        String customPropsString = config.saslCustomConfigJson();
-
-        if (customPropsString.isEmpty()) {
+        if (saslCustomConfigJson().isEmpty()) {
             throw new InvalidConfigurationException("Custom SASL config properties are not set");
         }
 
@@ -311,7 +370,7 @@ private void setCustomSaslConfigs(Map<String, Object> kafkaClientProps) {
         objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
 
         try {
-            Map<String, String> customProperties = objectMapper.readValue(customPropsString, STRING_HASH_MAP_TYPE_REFERENCE);
+            Map<String, String> customProperties = objectMapper.readValue(saslCustomConfigJson(), STRING_HASH_MAP_TYPE_REFERENCE);
 
             if (customProperties.isEmpty()) {
                 throw new InvalidConfigurationException("SASL custom config properties empty");
@@ -328,35 +387,31 @@ private void setCustomSaslConfigs(Map<String, Object> kafkaClientProps) {
                 kafkaClientProps.put(key, value);
             }
         } catch (JsonProcessingException e) {
-            throw new InvalidConfigurationException("SASL custom config properties deserialize failed. customProperties: '" + customPropsString + "'");
+            throw new InvalidConfigurationException("SASL custom config properties deserialize failed. customProperties: '" + saslCustomConfigJson() + "'");
         }
     }
 
     private void setStandardSaslConfigs(Map<String, Object> kafkaClientProps) {
-        TopicOperatorConfig config = this;
         String saslMechanism;
         String jaasConfig;
-        String username = config.saslUsername();
-        String password = config.saslPassword();
-        String configSaslMechanism = config.saslMechanism();
 
-        if (username.isEmpty() || password.isEmpty()) {
+        if (saslUsername().isEmpty() || saslPassword().isEmpty()) {
             throw new InvalidConfigurationException("SASL credentials are not set");
         }
 
-        if ("plain".equals(configSaslMechanism)) {
+        if ("plain".equals(saslMechanism())) {
             saslMechanism = "PLAIN";
-            jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
-        } else if ("scram-sha-256".equals(configSaslMechanism) || "scram-sha-512".equals(configSaslMechanism)) {
-            jaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
+            jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + saslUsername() + "\" password=\"" + saslPassword() + "\";";
+        } else if ("scram-sha-256".equals(saslMechanism()) || "scram-sha-512".equals(saslMechanism())) {
+            jaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + saslUsername() + "\" password=\"" + saslPassword() + "\";";
 
-            if ("scram-sha-256".equals(configSaslMechanism)) {
+            if ("scram-sha-256".equals(saslMechanism())) {
                 saslMechanism = "SCRAM-SHA-256";
             } else {
                 saslMechanism = "SCRAM-SHA-512";
             }
         } else {
-            throw new IllegalArgumentException("Invalid SASL_MECHANISM type: " + configSaslMechanism);
+            throw new IllegalArgumentException("Invalid SASL_MECHANISM type: " + saslMechanism());
         }
 
         kafkaClientProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
@@ -367,40 +422,40 @@ private void setStandardSaslConfigs(Map<String, Object> kafkaClientProps) {
     public String toString() {
         String mask = "********";
         return "TopicOperatorConfig{" +
-                "\n\tnamespace='" + namespace + '\'' +
-                "\n\tlabelSelector=" + labelSelector +
-                "\n\tbootstrapServers='" + bootstrapServers + '\'' +
-                "\n\tclientId='" + clientId + '\'' +
-                "\n\tfullReconciliationIntervalMs=" + fullReconciliationIntervalMs +
-                "\n\ttlsEnabled=" + tlsEnabled +
-                "\n\ttruststoreLocation='" + truststoreLocation + '\'' +
-                "\n\ttruststorePassword='" + mask + '\'' +
-                "\n\tkeystoreLocation='" + keystoreLocation + '\'' +
-                "\n\tkeystorePassword='" + mask + '\'' +
-                "\n\tsslEndpointIdentificationAlgorithm='" + sslEndpointIdentificationAlgorithm + '\'' +
-                "\n\tsaslEnabled=" + saslEnabled +
-                "\n\tsaslMechanism='" + saslMechanism + '\'' +
-                "\n\tsaslCustomConfigJson='" + (saslCustomConfigJson == null ? null : mask) + '\'' +
-                "\n\talterableTopicConfig='" + alterableTopicConfig + '\'' +
-                "\n\tskipClusterConfigReview='" + skipClusterConfigReview + '\'' +
-                "\n\tsaslUsername='" + saslUsername + '\'' +
-                "\n\tsaslPassword='" + mask + '\'' +
-                "\n\tsecurityProtocol='" + securityProtocol + '\'' +
-                "\n\tuseFinalizer=" + useFinalizer +
-                "\n\tmaxQueueSize=" + maxQueueSize +
-                "\n\tmaxBatchSize=" + maxBatchSize +
-                "\n\tmaxBatchLingerMs=" + maxBatchLingerMs +
-                "\n\tenableAdditionalMetrics=" + enableAdditionalMetrics +
-                "\n\tfeatureGates='" + featureGates + "'" +
-                "\n\tcruiseControlEnabled=" + cruiseControlEnabled +
-                "\n\tcruiseControlRackEnabled=" + cruiseControlRackEnabled +
-                "\n\tcruiseControlHostname=" + cruiseControlHostname +
-                "\n\tcruiseControlPort=" + cruiseControlPort +
-                "\n\tcruiseControlSslEnabled=" + cruiseControlSslEnabled +
-                "\n\tcruiseControlAuthEnabled=" + cruiseControlAuthEnabled +
-                "\n\tcruiseControlCrtFilePath=" + cruiseControlCrtFilePath +
-                "\n\tcruiseControlApiUserPath=" + cruiseControlApiUserPath +
-                "\n\tcruiseControlApiPassPath=" + cruiseControlApiPassPath +
-                '}';
+            "\n\tnamespace='" + namespace() + '\'' +
+            "\n\tresourceLabels=" + resourceLabels() +
+            "\n\tbootstrapServers='" + bootstrapServers() + '\'' +
+            "\n\tclientId='" + clientId() + '\'' +
+            "\n\tfullReconciliationIntervalMs=" + fullReconciliationIntervalMs() +
+            "\n\ttlsEnabled=" + tlsEnabled() +
+            "\n\ttruststoreLocation='" + truststoreLocation() + '\'' +
+            "\n\ttruststorePassword='" + mask + '\'' +
+            "\n\tkeystoreLocation='" + keystoreLocation() + '\'' +
+            "\n\tkeystorePassword='" + mask + '\'' +
+            "\n\tsslEndpointIdentificationAlgorithm='" + sslEndpointIdentificationAlgorithm() + '\'' +
+            "\n\tsaslEnabled=" + saslEnabled() +
+            "\n\tsaslMechanism='" + saslMechanism() + '\'' +
+            "\n\tsaslCustomConfigJson='" + (saslCustomConfigJson() == null ? null : mask) + '\'' +
+            "\n\talterableTopicConfig='" + alterableTopicConfig() + '\'' +
+            "\n\tskipClusterConfigReview='" + skipClusterConfigReview() + '\'' +
+            "\n\tsaslUsername='" + saslUsername() + '\'' +
+            "\n\tsaslPassword='" + mask + '\'' +
+            "\n\tsecurityProtocol='" + securityProtocol() + '\'' +
+            "\n\tuseFinalizer=" + useFinalizer() +
+            "\n\tmaxQueueSize=" + maxQueueSize() +
+            "\n\tmaxBatchSize=" + maxBatchSize() +
+            "\n\tmaxBatchLingerMs=" + maxBatchLingerMs() +
+            "\n\tenableAdditionalMetrics=" + enableAdditionalMetrics() +
+            "\n\tfeatureGates='" + featureGates() + "'" +
+            "\n\tcruiseControlEnabled=" + cruiseControlEnabled() +
+            "\n\tcruiseControlRackEnabled=" + cruiseControlRackEnabled() +
+            "\n\tcruiseControlHostname=" + cruiseControlHostname() +
+            "\n\tcruiseControlPort=" + cruiseControlPort() +
+            "\n\tcruiseControlSslEnabled=" + cruiseControlSslEnabled() +
+            "\n\tcruiseControlAuthEnabled=" + cruiseControlAuthEnabled() +
+            "\n\tcruiseControlCrtFilePath=" + cruiseControlCrtFilePath() +
+            "\n\tcruiseControlApiUserPath=" + cruiseControlApiUserPath() +
+            "\n\tcruiseControlApiPassPath=" + cruiseControlApiPassPath() +
+            '}';
     }
 }
diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java
index ebe0d4bc6c3..640f00edcff 100644
--- a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java
+++ b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorMain.java
@@ -58,9 +58,9 @@ public class TopicOperatorMain implements Liveness, Readiness {
 
     TopicOperatorMain(TopicOperatorConfig config, Admin kafkaAdminClient) {
         Objects.requireNonNull(config.namespace());
-        Objects.requireNonNull(config.labelSelector());
+        Objects.requireNonNull(config.resourceLabels());
         this.config = config;
-        var selector = config.labelSelector().toMap();
+        var selector = config.resourceLabels().toMap();
         this.kubernetesClient = new OperatorKubernetesClientBuilder("strimzi-topic-operator", TopicOperatorMain.class.getPackage().getImplementationVersion()).build();
         this.kafkaAdminClient = kafkaAdminClient;
         this.cruiseControlClient = TopicOperatorUtil.createCruiseControlClient(config);
diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java
index c9e99106aec..1878f69c4e4 100644
--- a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java
+++ b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java
@@ -72,8 +72,7 @@
 import static org.mockito.Mockito.verifyNoInteractions;
 
 /**
- * This test is not intended to provide lots of coverage of the {@link BatchingTopicController}, 
- * rather it aims to cover some parts that a difficult to test via {@link TopicControllerIT}.
+ * This test covers some parts that a difficult to test via {@link TopicControllerIT}.
  */
 @SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
 class BatchingTopicControllerIT implements TestSeparator {
diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java
index c9f8463e9ef..0aff79230b1 100644
--- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java
+++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java
@@ -13,7 +13,6 @@
 import io.strimzi.api.kafka.model.topic.KafkaTopic;
 import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder;
 import io.strimzi.api.kafka.model.topic.KafkaTopicStatus;
-import io.strimzi.operator.common.featuregates.FeatureGates;
 import io.strimzi.operator.common.model.Labels;
 import io.strimzi.operator.topic.model.KubeRef;
 import io.strimzi.operator.topic.model.TopicOperatorException;
@@ -98,8 +97,8 @@
 import static org.mockito.Mockito.mock;
 
 /**
- * This integration test suite provides coverage of the {@link BatchingTopicController}.
- * If you need to test individual units of code, use the the {@link BatchingTopicController}.
+ * This integration test covers the {@link BatchingTopicController}.
+ * This test requires a connection to a Kubernetes cluster.
  */
 @SuppressWarnings("checkstyle:ClassFanOutComplexity")
 class TopicControllerIT implements TestSeparator {
@@ -1273,17 +1272,17 @@ private static TopicOperatorConfig topicOperatorConfig(String ns, StrimziKafkaCl
     }
 
     private static TopicOperatorConfig topicOperatorConfig(String ns, StrimziKafkaCluster kafkaCluster, boolean useFinalizer, long fullReconciliationIntervalMs) {
-        return new TopicOperatorConfig(ns,
-            Labels.fromMap(SELECTOR),
-            kafkaCluster.getBootstrapServers(),
-            TopicControllerIT.class.getSimpleName(),
-            fullReconciliationIntervalMs,
-            false, "", "", "", "", "",
-            false, "", "", "", "", "",
-            useFinalizer,
-            100, 100, 10, false, new FeatureGates(""),
-            false, false, "", 9090, false, false, "", "", "",
-            "all", false);
+        return TopicOperatorConfig.buildFromMap(Map.of(
+            TopicOperatorConfig.NAMESPACE.key(), ns,
+            TopicOperatorConfig.RESOURCE_LABELS.key(), Labels.fromMap(SELECTOR).toSelectorString(),
+            TopicOperatorConfig.BOOTSTRAP_SERVERS.key(), kafkaCluster.getBootstrapServers(),
+            TopicOperatorConfig.CLIENT_ID.key(), TopicControllerIT.class.getSimpleName(),
+            TopicOperatorConfig.FULL_RECONCILIATION_INTERVAL_MS.key(), String.valueOf(fullReconciliationIntervalMs),
+            TopicOperatorConfig.USE_FINALIZERS.key(), String.valueOf(useFinalizer),
+            TopicOperatorConfig.MAX_QUEUE_SIZE.key(), "100",
+            TopicOperatorConfig.MAX_BATCH_SIZE.key(), "100",
+            TopicOperatorConfig.MAX_BATCH_LINGER_MS.key(), "10"
+        ));
     }
 
     @Test
@@ -2038,14 +2037,17 @@ public void shouldTerminateIfQueueFull() throws ExecutionException, InterruptedE
         // given
         String ns = createNamespace(NAMESPACE);
 
-        var config = new TopicOperatorConfig(ns, Labels.fromMap(SELECTOR),
-            kafkaCluster.getBootstrapServers(), TopicControllerIT.class.getSimpleName(), 10_000,
-            false, "", "", "", "", "",
-            false, "", "", "", "", "",
-            true,
-            1, 100, 5_0000, false, new FeatureGates(""),
-            false, false, "", 9090, false, false, "", "", "",
-            "all", false);
+        var config = TopicOperatorConfig.buildFromMap(Map.of(
+            TopicOperatorConfig.NAMESPACE.key(), ns,
+            TopicOperatorConfig.RESOURCE_LABELS.key(), Labels.fromMap(SELECTOR).toSelectorString(),
+            TopicOperatorConfig.BOOTSTRAP_SERVERS.key(), kafkaCluster.getBootstrapServers(),
+            TopicOperatorConfig.CLIENT_ID.key(), TopicControllerIT.class.getSimpleName(),
+            TopicOperatorConfig.FULL_RECONCILIATION_INTERVAL_MS.key(), "10000",
+            TopicOperatorConfig.USE_FINALIZERS.key(), "true",
+            TopicOperatorConfig.MAX_QUEUE_SIZE.key(), "1",
+            TopicOperatorConfig.MAX_BATCH_SIZE.key(), "100",
+            TopicOperatorConfig.MAX_BATCH_LINGER_MS.key(), "5000"
+        ));
 
         maybeStartOperator(config);