Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary methods from TopicOperatorUtil class #10918

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
import io.strimzi.operator.common.featuregates.FeatureGates;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties;
import io.strimzi.operator.topic.cruisecontrol.CruiseControlClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -127,7 +132,7 @@ public static TopicOperatorConfig buildFromMap(Map<String, String> map) {
LOGGER.infoOp("TopicOperator configuration is {}", topicOperatorConfig);
return topicOperatorConfig;
}

private static Set<String> keyNames() {
return Collections.unmodifiableSet(CONFIG_VALUES.keySet());
}
Expand Down Expand Up @@ -418,6 +423,34 @@ private void setStandardSaslConfigs(Map<String, Object> kafkaClientProps) {
kafkaClientProps.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
}

/**
* @return Cruise Control client configuration.
*/
public CruiseControlClient.Config cruiseControlClientConfig() {
var sslCertificate = cruiseControlSslEnabled() ? getFileContent(cruiseControlCrtFilePath()) : null;
var apiUsername = cruiseControlAuthEnabled() ? new String(getFileContent(cruiseControlApiUserPath()), StandardCharsets.UTF_8) : null;
var apiPassword = cruiseControlAuthEnabled() ? new String(getFileContent(cruiseControlApiPassPath()), StandardCharsets.UTF_8) : null;

return new CruiseControlClient.Config(
cruiseControlHostname(),
cruiseControlPort(),
cruiseControlRackEnabled(),
cruiseControlSslEnabled(),
sslCertificate,
cruiseControlAuthEnabled(),
apiUsername,
apiPassword
);
}

private static byte[] getFileContent(String filePath) {
try {
return Files.readAllBytes(Path.of(filePath));
} catch (IOException ioe) {
throw new IllegalArgumentException(String.format("File not found: %s", filePath), ioe);
}
}

@Override
public String toString() {
String mask = "********";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ public class TopicOperatorMain implements Liveness, Readiness {
Objects.requireNonNull(config.resourceLabels());
this.config = config;
var selector = config.resourceLabels().toMap();
this.kubernetesClient = new OperatorKubernetesClientBuilder("strimzi-topic-operator", TopicOperatorMain.class.getPackage().getImplementationVersion()).build();
this.kubernetesClient = new OperatorKubernetesClientBuilder(
"strimzi-topic-operator",
TopicOperatorMain.class.getPackage().getImplementationVersion()
).build();
this.kafkaAdminClient = kafkaAdminClient;
this.cruiseControlClient = TopicOperatorUtil.createCruiseControlClient(config);
this.cruiseControlClient = CruiseControlClient.create(config.cruiseControlClientConfig());

var metricsProvider = createMetricsProvider();
var metricsHolder = new TopicOperatorMetricsHolder(KafkaTopic.RESOURCE_KIND, Labels.fromMap(selector), metricsProvider);
Expand Down Expand Up @@ -154,7 +157,7 @@ private synchronized void shutdown() {
*/
public static void main(String[] args) throws Exception {
var config = TopicOperatorConfig.buildFromMap(System.getenv());
var operator = operator(config, TopicOperatorUtil.createKafkaAdminClient(config));
var operator = operator(config, Admin.create(config.adminClientConfig()));
operator.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,13 @@
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.model.InvalidResourceException;
import io.strimzi.operator.topic.cruisecontrol.CruiseControlClient;
import io.strimzi.operator.topic.metrics.TopicOperatorMetricsHolder;
import io.strimzi.operator.topic.model.Either;
import io.strimzi.operator.topic.model.Pair;
import io.strimzi.operator.topic.model.PartitionedByError;
import io.strimzi.operator.topic.model.ReconcilableTopic;
import io.strimzi.operator.topic.model.TopicOperatorException;
import org.apache.kafka.clients.admin.Admin;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -37,45 +30,6 @@ public class TopicOperatorUtil {

private TopicOperatorUtil() { }

/**
* Create a new Kafka admin client instance.
*
* @param config Topic Operator configuration.
* @return Kafka admin client.
*/
public static Admin createKafkaAdminClient(TopicOperatorConfig config) {
return Admin.create(config.adminClientConfig());
}

/**
* Create a new Cruise Control client instance.
*
* @param config Topic Operator configuration.
* @return Cruise Control client.
*/
public static CruiseControlClient createCruiseControlClient(TopicOperatorConfig config) {
return CruiseControlClient.create(
config.cruiseControlHostname(),
config.cruiseControlPort(),
config.cruiseControlRackEnabled(),
config.cruiseControlSslEnabled(),
config.cruiseControlSslEnabled() ? getFileContent(config.cruiseControlCrtFilePath()) : null,
config.cruiseControlAuthEnabled(),
config.cruiseControlAuthEnabled()
? new String(getFileContent(config.cruiseControlApiUserPath()), StandardCharsets.UTF_8) : null,
config.cruiseControlAuthEnabled()
? new String(getFileContent(config.cruiseControlApiPassPath()), StandardCharsets.UTF_8) : null
);
}

private static byte[] getFileContent(String filePath) {
try {
return Files.readAllBytes(Path.of(filePath));
} catch (IOException ioe) {
throw new UncheckedIOException(String.format("File not found: %s", filePath), ioe);
}
}

/**
* Get the topic name from a {@link KafkaTopic} resource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.Set;

/**
* Cruise Control REST API client.
* Cruise Control client.
* <br/><br/>
* The server runs one task execution at a time, additional
* requests are queued up to {@code max.active.user.tasks}.
Expand All @@ -26,36 +26,13 @@ public interface CruiseControlClient {
long HTTP_REQUEST_TIMEOUT_SEC = 60;

/**
* Create default Cruise Control client instance.
* Create a new Cruise Control client with the given configuration.
*
* @param serverHostname Server hostname.
* @param serverPort Server port.
* @param rackEnabled Whether rack awareness is enabled.
* @param sslEnabled Whether SSL is enabled.
* @param sslCertificate SSL certificate.
* @param authEnabled Whether authentication is enabled.
* @param authUsername Authentication username.
* @param authPassword Authentication password.
* @param config Configuration.
* @return Cruise Control client.
*/
static CruiseControlClient create(String serverHostname,
int serverPort,
boolean rackEnabled,
boolean sslEnabled,
byte[] sslCertificate,
boolean authEnabled,
String authUsername,
String authPassword) {
return new CruiseControlClientImpl(
serverHostname,
serverPort,
rackEnabled,
sslEnabled,
sslCertificate,
authEnabled,
authUsername,
authPassword
);
static CruiseControlClient create(Config config) {
return new CruiseControlClientImpl(config);
}

/**
Expand Down Expand Up @@ -88,6 +65,106 @@ static CruiseControlClient create(String serverHostname,
* @return The error message.
*/
Optional<String> errorMessage(HttpResponse<String> response);

/**
* Client configuration.
*/
class Config {
private final String serverHostname;
private final int serverPort;
private final boolean rackEnabled;
private final boolean sslEnabled;
private final byte[] sslCertificate;
private final boolean authEnabled;
private final String authUsername;
private final String authPassword;

/**
* Create new configuration.
*
* @param serverHostname Server hostname.
* @param serverPort Server port.
* @param rackEnabled Whether rack awareness is enabled.
* @param sslEnabled Whether SSL is enabled.
* @param sslCertificate SSL certificate.
* @param authEnabled Whether authentication is enabled.
* @param authUsername Authentication username.
* @param authPassword Authentication password.
*/
public Config(String serverHostname,
int serverPort,
boolean rackEnabled,
boolean sslEnabled,
byte[] sslCertificate,
boolean authEnabled,
String authUsername,
String authPassword) {
if (serverHostname == null || serverHostname.isBlank()) {
throw new IllegalArgumentException("Hostname is not set");
}
if (serverPort <= 0) {
throw new IllegalArgumentException("Port number is invalid");
}
if (sslEnabled && (sslCertificate == null || sslCertificate.length == 0)) {
throw new IllegalArgumentException("SSL certificate is not set");
}
if (authEnabled && (authUsername == null || authUsername.isBlank())) {
throw new IllegalArgumentException("Authentication username is not set");
}
if (authEnabled && (authPassword == null || authPassword.isBlank())) {
throw new IllegalArgumentException("Authentication password is not set");
}

this.serverHostname = serverHostname;
this.serverPort = serverPort;
this.rackEnabled = rackEnabled;
this.sslEnabled = sslEnabled;
this.sslCertificate = sslCertificate;
this.authEnabled = authEnabled;
this.authUsername = authUsername;
this.authPassword = authPassword;
}

/** @return Server hostname. */
public String serverHostname() {
return serverHostname;
}

/** @return Server port. */
public int serverPort() {
return serverPort;
}

/** @return Rack enabled. */
public boolean rackEnabled() {
return rackEnabled;
}

/** @return SSL enabled. */
public boolean sslEnabled() {
return sslEnabled;
}

/** @return SSL certificate. */
public byte[] sslCertificate() {
return sslCertificate;
}

/** @return Authentication enabled. */
public boolean authEnabled() {
return authEnabled;
}

/** @return Authentication username. */
public String authUsername() {
return authUsername;
}

/** @return Authentication password. */
public String authPassword() {
return authPassword;
}
}

/**
* Topic names grouped by replication factor value.
Expand Down
Loading
Loading