Skip to content

Commit

Permalink
Remove Vertx from KafkaConnect API (#10911)
Browse files Browse the repository at this point in the history
Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge authored Dec 10, 2024
1 parent 5cd2799 commit b796120
Show file tree
Hide file tree
Showing 18 changed files with 868 additions and 966 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
import io.strimzi.operator.cluster.model.SharedEnvironmentProvider;
import io.strimzi.operator.cluster.operator.VertxUtil;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
import io.strimzi.operator.cluster.operator.resource.kubernetes.ClusterRoleBindingOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.ConfigMapOperator;
Expand Down Expand Up @@ -355,7 +356,7 @@ protected Future<Void> connectPodDisruptionBudget(Reconciliation reconciliation,
*/
protected Future<Void> reconcileConnectLoggers(Reconciliation reconciliation, String host, String desiredLogging, OrderedProperties defaultLogging) {
KafkaConnectApi apiClient = connectClientProvider.apply(vertx);
return apiClient.updateConnectLoggers(reconciliation, host, port, desiredLogging, defaultLogging)
return VertxUtil.completableFutureToVertxFuture(apiClient.updateConnectLoggers(reconciliation, host, port, desiredLogging, defaultLogging))
.compose(updated -> {
if (Boolean.TRUE.equals(updated)) {
LOGGER.infoCr(reconciliation, "Logging configuration was updated");
Expand Down Expand Up @@ -383,17 +384,16 @@ protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reco
String connectorName, KafkaConnectorSpec connectorSpec, CustomResource resource) {
KafkaConnectorConfiguration desiredConfig = new KafkaConnectorConfiguration(reconciliation, connectorSpec.getConfig().entrySet());

return apiClient.getConnectorConfig(reconciliation, new BackOff(200L, 2, 6), host, port, connectorName).compose(
return VertxUtil.completableFutureToVertxFuture(apiClient.getConnectorConfig(reconciliation, new BackOff(200L, 2, 6), host, port, connectorName)).compose(
currentConfig -> {
if (!needsReconfiguring(reconciliation, connectorName, connectorSpec, desiredConfig.asOrderedProperties().asMap(), currentConfig)) {
LOGGER.debugCr(reconciliation, "Connector {} exists and has desired config, {}=={}", connectorName, desiredConfig.asOrderedProperties().asMap(), currentConfig);
return apiClient.status(reconciliation, host, port, connectorName)
return VertxUtil.completableFutureToVertxFuture(apiClient.status(reconciliation, host, port, connectorName))
.compose(status -> updateState(reconciliation, host, apiClient, connectorName, connectorSpec, status, new ArrayList<>()))
.compose(conditions -> manageConnectorOffsets(reconciliation, host, apiClient, connectorName, resource, connectorSpec, conditions))
.compose(conditions -> maybeRestartConnector(reconciliation, host, apiClient, connectorName, resource, conditions))
.compose(conditions -> maybeRestartConnectorTask(reconciliation, host, apiClient, connectorName, resource, conditions))
.compose(conditions ->
apiClient.statusWithBackOff(reconciliation, new BackOff(200L, 2, 10), host, port, connectorName)
.compose(conditions -> VertxUtil.completableFutureToVertxFuture(apiClient.statusWithBackOff(reconciliation, new BackOff(200L, 2, 10), host, port, connectorName))
.compose(createConnectorStatusAndConditions(conditions)))
.compose(status -> autoRestartFailedConnectorAndTasks(reconciliation, host, apiClient, connectorName, connectorSpec, status, resource))
.compose(status -> updateConnectorTopics(reconciliation, host, apiClient, connectorName, status));
Expand Down Expand Up @@ -440,11 +440,11 @@ private boolean needsReconfiguring(Reconciliation reconciliation, String connect

private Future<Map<String, Object>> createOrUpdateConnector(Reconciliation reconciliation, String host, KafkaConnectApi apiClient,
String connectorName, KafkaConnectorSpec connectorSpec, KafkaConnectorConfiguration desiredConfig) {
return apiClient.createOrUpdatePutRequest(reconciliation, host, port, connectorName, asJson(connectorSpec, desiredConfig))
.compose(ignored -> apiClient.statusWithBackOff(reconciliation, new BackOff(200L, 2, 10), host, port,
connectorName))
return VertxUtil.completableFutureToVertxFuture(apiClient.createOrUpdatePutRequest(reconciliation, host, port, connectorName, asJson(connectorSpec, desiredConfig)))
.compose(ignored -> VertxUtil.completableFutureToVertxFuture(apiClient.statusWithBackOff(reconciliation, new BackOff(200L, 2, 10), host, port,
connectorName)))
.compose(status -> updateState(reconciliation, host, apiClient, connectorName, connectorSpec, status, new ArrayList<>()))
.compose(ignored -> apiClient.status(reconciliation, host, port, connectorName));
.compose(ignored -> VertxUtil.completableFutureToVertxFuture(apiClient.status(reconciliation, host, port, connectorName)));
}

private Future<List<Condition>> updateState(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, String connectorName, KafkaConnectorSpec connectorSpec, Map<String, Object> status, List<Condition> conditions) {
Expand All @@ -469,28 +469,28 @@ private Future<List<Condition>> updateState(Reconciliation reconciliation, Strin
case "RUNNING" -> {
if (targetState == ConnectorState.PAUSED) {
LOGGER.infoCr(reconciliation, "Pausing connector {}", connectorName);
future = apiClient.pause(reconciliation, host, port, connectorName);
future = VertxUtil.completableFutureToVertxFuture(apiClient.pause(reconciliation, host, port, connectorName));
} else if (targetState == ConnectorState.STOPPED) {
LOGGER.infoCr(reconciliation, "Stopping connector {}", connectorName);
future = apiClient.stop(reconciliation, host, port, connectorName);
future = VertxUtil.completableFutureToVertxFuture(apiClient.stop(reconciliation, host, port, connectorName));
}
}
case "PAUSED" -> {
if (targetState == ConnectorState.RUNNING) {
LOGGER.infoCr(reconciliation, "Resuming connector {}", connectorName);
future = apiClient.resume(reconciliation, host, port, connectorName);
future = VertxUtil.completableFutureToVertxFuture(apiClient.resume(reconciliation, host, port, connectorName));
} else if (targetState == ConnectorState.STOPPED) {
LOGGER.infoCr(reconciliation, "Stopping connector {}", connectorName);
future = apiClient.stop(reconciliation, host, port, connectorName);
future = VertxUtil.completableFutureToVertxFuture(apiClient.stop(reconciliation, host, port, connectorName));
}
}
case "STOPPED" -> {
if (targetState == ConnectorState.RUNNING) {
LOGGER.infoCr(reconciliation, "Resuming connector {}", connectorName);
future = apiClient.resume(reconciliation, host, port, connectorName);
future = VertxUtil.completableFutureToVertxFuture(apiClient.resume(reconciliation, host, port, connectorName));
} else if (targetState == ConnectorState.PAUSED) {
LOGGER.infoCr(reconciliation, "Pausing connector {}", connectorName);
future = apiClient.pause(reconciliation, host, port, connectorName);
future = VertxUtil.completableFutureToVertxFuture(apiClient.pause(reconciliation, host, port, connectorName));
}
}
default -> {
Expand Down Expand Up @@ -575,7 +575,7 @@ private Future<List<Condition>> updateState(Reconciliation reconciliation, Strin
*/
private Future<ConnectorStatusAndConditions> autoRestartConnector(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, String connectorName, ConnectorStatusAndConditions status, AutoRestartStatus previousAutoRestartStatus) {
LOGGER.infoCr(reconciliation, "Auto restarting connector {}", connectorName);
return apiClient.restart(host, port, connectorName, true, true)
return VertxUtil.completableFutureToVertxFuture(apiClient.restart(host, port, connectorName, true, true))
.compose(
statusResult -> {
LOGGER.infoCr(reconciliation, "Restarted connector {} ", connectorName);
Expand Down Expand Up @@ -655,7 +655,7 @@ private static int nextAutoRestartBackOffIntervalInMinutes(int restartCount)
private Future<List<Condition>> maybeRestartConnector(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, String connectorName, CustomResource resource, List<Condition> conditions) {
if (hasRestartAnnotation(resource, connectorName)) {
LOGGER.debugCr(reconciliation, "Restarting connector {}", connectorName);
return apiClient.restart(host, port, connectorName, false, false)
return VertxUtil.completableFutureToVertxFuture(apiClient.restart(host, port, connectorName, false, false))
.compose(ignored -> removeRestartAnnotation(reconciliation, resource)
.compose(v -> Future.succeededFuture(conditions)),
throwable -> {
Expand All @@ -675,7 +675,7 @@ private Future<List<Condition>> maybeRestartConnectorTask(Reconciliation reconci
int taskID = getRestartTaskAnnotationTaskID(resource, connectorName);
if (taskID >= 0) {
LOGGER.debugCr(reconciliation, "Restarting connector task {}:{}", connectorName, taskID);
return apiClient.restartTask(host, port, connectorName, taskID)
return VertxUtil.completableFutureToVertxFuture(apiClient.restartTask(host, port, connectorName, taskID))
.compose(ignored -> removeRestartTaskAnnotation(reconciliation, resource)
.compose(v -> Future.succeededFuture(conditions)),
throwable -> {
Expand Down Expand Up @@ -761,7 +761,7 @@ private Future<List<Condition>> listConnectorOffsets(Reconciliation reconciliati
}

String configMapName = listOffsetsConfig.get().getToConfigMap().getName();
return apiClient.getConnectorOffsets(reconciliation, host, port, connectorName)
return VertxUtil.completableFutureToVertxFuture(apiClient.getConnectorOffsets(reconciliation, host, port, connectorName))
.compose(offsets -> generateListOffsetsConfigMap(configMapName, connectorName, resource, offsets))
.compose(configMap -> configMapOperations.reconcile(reconciliation, resource.getMetadata().getNamespace(), configMapName, configMap))
.compose(v -> removeConnectorOffsetsAnnotations(reconciliation, resource))
Expand Down Expand Up @@ -853,7 +853,7 @@ private Future<List<Condition>> alterConnectorOffsets(Reconciliation reconciliat
String configMapName = alterOffsetsConfig.get().getFromConfigMap().getName();
return verifyConnectorStopped(reconciliation, host, apiClient, connectorName)
.compose(v -> getOffsetsForAlterRequest(configMapNamespace, configMapName, getConnectorOffsetsConfigMapEntryKey(connectorName)))
.compose(offsets -> apiClient.alterConnectorOffsets(reconciliation, host, port, connectorName, offsets))
.compose(offsets -> VertxUtil.completableFutureToVertxFuture(apiClient.alterConnectorOffsets(reconciliation, host, port, connectorName, offsets)))
.compose(v -> {
LOGGER.infoCr(reconciliation, "Offsets of connector {} were altered", connectorName);
return Future.succeededFuture();
Expand Down Expand Up @@ -917,7 +917,7 @@ private Future<List<Condition>> resetConnectorOffsets(Reconciliation reconciliat
LOGGER.infoCr(reconciliation, "Resetting offsets of connector {}", connectorName);

return verifyConnectorStopped(reconciliation, host, apiClient, connectorName)
.compose(v -> apiClient.resetConnectorOffsets(reconciliation, host, port, connectorName))
.compose(v -> VertxUtil.completableFutureToVertxFuture(apiClient.resetConnectorOffsets(reconciliation, host, port, connectorName)))
.compose(v -> {
LOGGER.infoCr(reconciliation, "Offsets of connector {} were reset", connectorName);
return Future.succeededFuture();
Expand All @@ -934,7 +934,7 @@ private Future<List<Condition>> resetConnectorOffsets(Reconciliation reconciliat
}

private Future<Void> verifyConnectorStopped(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, String connectorName) {
return apiClient.status(reconciliation, host, port, connectorName)
return VertxUtil.completableFutureToVertxFuture(apiClient.status(reconciliation, host, port, connectorName))
.compose(status -> {
@SuppressWarnings({ "rawtypes" })
Object path = ((Map) status.getOrDefault("connector", emptyMap())).get("state");
Expand All @@ -950,7 +950,7 @@ private Future<Void> verifyConnectorStopped(Reconciliation reconciliation, Strin
}

private Future<ConnectorStatusAndConditions> updateConnectorTopics(Reconciliation reconciliation, String host, KafkaConnectApi apiClient, String connectorName, ConnectorStatusAndConditions status) {
return apiClient.getConnectorTopics(reconciliation, host, port, connectorName)
return VertxUtil.completableFutureToVertxFuture(apiClient.getConnectorTopics(reconciliation, host, port, connectorName))
.compose(updateConnectorStatusAndConditions(status));
}

Expand Down
Loading

0 comments on commit b796120

Please sign in to comment.