Skip to content

Commit

Permalink
Improve create factory method
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri committed Dec 12, 2024
1 parent c1b176f commit 5098ce0
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.Set;

/**
* Cruise Control REST API client.
* Cruise Control client.
* <br/><br/>
* The server runs one task execution at a time, additional
* requests are queued up to {@code max.active.user.tasks}.
Expand All @@ -26,13 +26,13 @@ public interface CruiseControlClient {
long HTTP_REQUEST_TIMEOUT_SEC = 60;

/**
* Create default Cruise Control client instance.
* Create a new Cruise Control client with the given configuration.
*
* @param config Configuration.
* @return Cruise Control client.
*/
static CruiseControlClient create(Config config) {
return new CruiseControlClientImpl(config);
return CruiseControlClientImpl.createInternal(config);
}

/**
Expand Down Expand Up @@ -68,103 +68,26 @@ static CruiseControlClient create(Config config) {

/**
* Client configuration.
*
* @param serverHostname Server hostname.
* @param serverPort Server port.
* @param rackEnabled Whether rack awareness is enabled.
* @param sslEnabled Whether SSL is enabled.
* @param sslCertificate SSL certificate.
* @param authEnabled Whether authentication is enabled.
* @param authUsername Authentication username.
* @param authPassword Authentication password.
*/
class Config {
private String serverHostname;
private int serverPort;
private boolean rackEnabled;
private boolean sslEnabled;
private byte[] sslCertificate;
private boolean authEnabled;
private String authUsername;
private String authPassword;

/**
* Create new configuration.
*
* @param serverHostname Server hostname.
* @param serverPort Server port.
* @param rackEnabled Whether rack awareness is enabled.
* @param sslEnabled Whether SSL is enabled.
* @param sslCertificate SSL certificate.
* @param authEnabled Whether authentication is enabled.
* @param authUsername Authentication username.
* @param authPassword Authentication password.
*/
public Config(String serverHostname,
int serverPort,
boolean rackEnabled,
boolean sslEnabled,
byte[] sslCertificate,
boolean authEnabled,
String authUsername,
String authPassword) {
if (serverHostname == null || serverHostname.isBlank()) {
throw new IllegalArgumentException("Hostname is not set");
}
if (serverPort <= 0) {
throw new IllegalArgumentException("Port number is invalid");
}
if (sslEnabled && (sslCertificate == null || sslCertificate.length == 0)) {
throw new IllegalArgumentException("SSL certificate is not set");
}
if (authEnabled && (authUsername == null || authUsername.isBlank())) {
throw new IllegalArgumentException("Authentication username is not set");
}
if (authEnabled && (authPassword == null || authPassword.isBlank())) {
throw new IllegalArgumentException("Authentication password is not set");
}

this.serverHostname = serverHostname;
this.serverPort = serverPort;
this.rackEnabled = rackEnabled;
this.sslEnabled = sslEnabled;
this.sslCertificate = sslCertificate;
this.authEnabled = authEnabled;
this.authUsername = authUsername;
this.authPassword = authPassword;
}

/** @return Server hostname. */
public String serverHostname() {
return serverHostname;
}

/** @return Server port. */
public int serverPort() {
return serverPort;
}

/** @return Rack enabled. */
public boolean rackEnabled() {
return rackEnabled;
}

/** @return SSL enabled. */
public boolean sslEnabled() {
return sslEnabled;
}

/** @return SSL certificate. */
public byte[] sslCertificate() {
return sslCertificate;
}

/** @return Authentication enabled. */
public boolean authEnabled() {
return authEnabled;
}

/** @return Authentication username. */
public String authUsername() {
return authUsername;
}

/** @return Authentication password. */
public String authPassword() {
return authPassword;
}
}
record Config(
String serverHostname,
int serverPort,
boolean rackEnabled,
boolean sslEnabled,
byte[] sslCertificate,
boolean authEnabled,
String authUsername,
String authPassword
) { }

/**
* Topic names grouped by replication factor value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,41 @@
import static org.apache.logging.log4j.core.util.Throwables.getRootCause;

/**
* Cruise Control REST API client based on Java HTTP client.
* Default implementation of the Cruise Control client based on Java HTTP client.
*/
public class CruiseControlClientImpl implements CruiseControlClient {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(CruiseControlClientImpl.class);

private final String serverHostname;
private final int serverPort;
private final boolean rackEnabled;
private final boolean sslEnabled;
private final byte[] sslCertificate;
private final boolean authEnabled;
private final String authUsername;
private final String authPassword;

private final Config config;
private final ExecutorService httpClientExecutor;
private HttpClient httpClient;
private final ObjectMapper objectMapper;

CruiseControlClientImpl(Config config) {
this.serverHostname = config.serverHostname();
this.serverPort = config.serverPort();
this.rackEnabled = config.rackEnabled();
this.sslEnabled = config.sslEnabled();
this.sslCertificate = config.sslCertificate();
this.authEnabled = config.authEnabled();
this.authUsername = config.authUsername();
this.authPassword = config.authPassword();
private CruiseControlClientImpl(Config config) {
this.config = config;
this.httpClientExecutor = Executors.newCachedThreadPool();
this.httpClient = buildHttpClient();
this.objectMapper = new ObjectMapper();
}

static CruiseControlClient createInternal(Config config) {
if (config.serverHostname() == null || config.serverHostname().isBlank()) {
throw new IllegalArgumentException("Hostname is not set");
}
if (config.serverPort() <= 0) {
throw new IllegalArgumentException("Port number is invalid");
}
if (config.sslEnabled() && (config.sslCertificate() == null || config.sslCertificate().length == 0)) {
throw new IllegalArgumentException("SSL certificate is not set");
}
if (config.authEnabled() && (config.authUsername() == null || config.authUsername().isBlank())) {
throw new IllegalArgumentException("Authentication username is not set");
}
if (config.authEnabled() && (config.authPassword() == null || config.authPassword().isBlank())) {
throw new IllegalArgumentException("Authentication password is not set");
}
return new CruiseControlClientImpl(config);
}

@Override
public void close() {
Expand Down Expand Up @@ -114,8 +118,8 @@ public String topicConfiguration(List<KafkaTopic> kafkaTopics) {
}

// build request
URI requestUri = new UrlBuilder(serverHostname, serverPort, CruiseControlEndpoints.TOPIC_CONFIGURATION, sslEnabled)
.withParameter(CruiseControlParameters.SKIP_RACK_AWARENESS_CHECK, String.valueOf(!rackEnabled))
URI requestUri = new UrlBuilder(config.serverHostname(), config.serverPort(), CruiseControlEndpoints.TOPIC_CONFIGURATION, config.sslEnabled())
.withParameter(CruiseControlParameters.SKIP_RACK_AWARENESS_CHECK, String.valueOf(!config.rackEnabled()))
.withParameter(CruiseControlParameters.DRY_RUN, "false")
.withParameter(CruiseControlParameters.JSON, "true")
.build();
Expand All @@ -125,8 +129,8 @@ public String topicConfiguration(List<KafkaTopic> kafkaTopics) {
.timeout(Duration.of(HTTP_REQUEST_TIMEOUT_SEC, ChronoUnit.SECONDS))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(jsonPayload));
if (authEnabled) {
builder.header("Authorization", buildBasicAuthValue(authUsername, authPassword));
if (config.authEnabled()) {
builder.header("Authorization", buildBasicAuthValue(config.authUsername(), config.authPassword()));
}
HttpRequest request = builder.build();
LOGGER.traceOp("Request: {}", request);
Expand Down Expand Up @@ -154,7 +158,7 @@ public String topicConfiguration(List<KafkaTopic> kafkaTopics) {
@Override
public UserTasksResponse userTasks(Set<String> userTaskIds) {
// build request
URI requestUrl = new UrlBuilder(serverHostname, serverPort, CruiseControlEndpoints.USER_TASKS, sslEnabled)
URI requestUrl = new UrlBuilder(config.serverHostname(), config.serverPort(), CruiseControlEndpoints.USER_TASKS, config.sslEnabled())
.withParameter(CruiseControlParameters.USER_TASK_IDS, new ArrayList<>(userTaskIds))
.withParameter(CruiseControlParameters.FETCH_COMPLETE, "false")
.withParameter(CruiseControlParameters.JSON, "true")
Expand All @@ -164,8 +168,8 @@ public UserTasksResponse userTasks(Set<String> userTaskIds) {
.uri(requestUrl)
.timeout(Duration.of(HTTP_REQUEST_TIMEOUT_SEC, ChronoUnit.SECONDS))
.GET();
if (authEnabled) {
builder.header("Authorization", buildBasicAuthValue(authUsername, authPassword));
if (config.authEnabled()) {
builder.header("Authorization", buildBasicAuthValue(config.authUsername(), config.authPassword()));
}
HttpRequest request = builder.build();
LOGGER.traceOp("Request: {}", request);
Expand Down Expand Up @@ -226,11 +230,11 @@ public Optional<String> errorMessage(HttpResponse<String> response) {
private HttpClient buildHttpClient() {
try {
HttpClient.Builder builder = HttpClient.newBuilder().executor(httpClientExecutor);
if (sslEnabled) {
if (config.sslEnabled()) {
// load the certificate chain to be trusted
CertificateFactory cf = CertificateFactory.getInstance("X.509");
Certificate ca;
try (var caInput = new ByteArrayInputStream(sslCertificate)) {
try (var caInput = new ByteArrayInputStream(config.sslCertificate())) {
ca = cf.generateCertificate(caInput);
}

Expand Down

0 comments on commit 5098ce0

Please sign in to comment.