Skip to content

Commit

Permalink
Remove last ZooKeeper methods from KafkaResources (#11040)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored Jan 15, 2025
1 parent a40a265 commit 0a669b8
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,58 +180,6 @@ public static String kafkaNetworkPolicyName(String clusterName) {
return clusterName + "-network-policy-kafka";
}

////////
// ZooKeeper methods => still used in system tests
////////

/**
* Returns the name of the ZooKeeper {@code StrimziPodSet} for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @return The name of the corresponding ZooKeeper {@code StrimziPodSet}.
*/
public static String zookeeperComponentName(String clusterName) {
return clusterName + "-zookeeper";
}

/**
* Returns the name of the ZooKeeper {@code Pod} for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @param podNum The number of the ZooKeeper pod
* @return The name of the corresponding ZooKeeper {@code Pod}.
*/
public static String zookeeperPodName(String clusterName, int podNum) {
return zookeeperComponentName(clusterName) + "-" + podNum;
}

/**
* Returns the name of the ZooKeeper metrics and log {@code ConfigMap} for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @return The name of the corresponding ZooKeeper metrics and log {@code ConfigMap}.
*/
public static String zookeeperMetricsAndLogConfigMapName(String clusterName) {
return clusterName + "-zookeeper-config";
}

/**
* Returns the name of the ZooKeeper headless service name for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @return The name of the corresponding ZooKeeper headless service name.
*/
public static String zookeeperHeadlessServiceName(String clusterName) {
return clusterName + "-zookeeper-nodes";
}

/**
* Returns the name of the ZooKeeper Network Policy.
*
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
*
* @return The name of the corresponding ZooKeeper Network Policy.
*/
public static String zookeeperNetworkPolicyName(String clusterName) {
return clusterName + "-network-policy-zookeeper";
}

////////
// Entity Operator methods
////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2114,7 +2114,6 @@ private void initTrustRolloutTestMocks(ResourceOperatorSupplier supplier,
when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods));

StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator;
when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture());
StrimziPodSet controllerPodSet = new StrimziPodSetBuilder()
.withNewMetadata()
.withName(NAME + "-controller")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public static MixedOperation<StrimziPodSet, StrimziPodSetList, Resource<StrimziP
/**
* Based on used mode - ZK, KRaft separate role, KRaft mixed role - returns the name of the controller's StrimziPodSet
* In case of:
* - ZK mode, it returns {@link KafkaResources#zookeeperComponentName(String)}
* - KRaft mode - separate role, it returns {@link KafkaResource#getStrimziPodSetName(String, String)} with Pool name from
* {@link KafkaNodePoolResource#getControllerPoolName(String)}
* - KRaft mode - mixed role, it returns {@link KafkaResource#getStrimziPodSetName(String, String)} with Pool name from
Expand All @@ -69,13 +68,10 @@ public static MixedOperation<StrimziPodSet, StrimziPodSetList, Resource<StrimziP
* @return component name of controller
*/
public static String getControllerComponentName(String clusterName) {
if (Environment.isKRaftModeEnabled()) {
if (!Environment.isSeparateRolesMode()) {
return KafkaResource.getStrimziPodSetName(clusterName, KafkaNodePoolResource.getMixedPoolName(clusterName));
}
return KafkaResource.getStrimziPodSetName(clusterName, KafkaNodePoolResource.getControllerPoolName(clusterName));
if (!Environment.isSeparateRolesMode()) {
return KafkaResource.getStrimziPodSetName(clusterName, KafkaNodePoolResource.getMixedPoolName(clusterName));
}
return KafkaResources.zookeeperComponentName(clusterName);
return KafkaResource.getStrimziPodSetName(clusterName, KafkaNodePoolResource.getControllerPoolName(clusterName));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.strimzi.api.kafka.model.connect.KafkaConnect;
import io.strimzi.api.kafka.model.connect.KafkaConnectResources;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.systemtest.Environment;
Expand Down Expand Up @@ -351,14 +350,6 @@ public static void verifyClusterOperatorKafkaDockerImages(String clusterOperator

final String kafkaVersion = Optional.ofNullable(Crds.kafkaOperation(kubeClient(kafkaNamespaceName).getClient()).inNamespace(kafkaNamespaceName).withName(clusterName).get().getSpec().getKafka().getVersion()).orElse(Environment.ST_KAFKA_VERSION);

if (!Environment.isKRaftModeEnabled()) {
//Verifying docker image for zookeeper pods
for (int i = 0; i < controllerPods; i++) {
String imgFromPod = PodUtils.getContainerImageNameFromPod(kafkaNamespaceName, KafkaResources.zookeeperPodName(clusterName, i), "zookeeper");
assertThat("ZooKeeper Pod: " + i + " uses wrong image", imgFromPod, containsString(StUtils.parseImageMap(imgFromDeplConf.get(TestConstants.KAFKA_IMAGE_MAP)).get(kafkaVersion)));
}
}

//Verifying docker image for kafka pods
brokerPods.forEach(brokerPod -> {
String imgFromPod = PodUtils.getContainerImageNameFromPod(kafkaNamespaceName, brokerPod, "kafka");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.strimzi.kafka.config.model.ConfigModels;
import io.strimzi.kafka.config.model.Scope;
import io.strimzi.operator.common.Util;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.cli.KafkaCmdClient;
import io.strimzi.systemtest.resources.ResourceManager;
Expand All @@ -39,7 +38,6 @@
import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
import io.strimzi.test.TestUtils;
import io.strimzi.test.executor.ExecResult;
import io.strimzi.test.k8s.exceptions.KubeClusterException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hamcrest.CoreMatchers;
Expand All @@ -55,17 +53,13 @@
import java.util.Map;
import java.util.Random;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static io.strimzi.api.kafka.model.kafka.KafkaClusterSpec.FORBIDDEN_PREFIXES;
import static io.strimzi.api.kafka.model.kafka.KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS;
import static io.strimzi.api.kafka.model.kafka.KafkaResources.kafkaComponentName;
import static io.strimzi.api.kafka.model.kafka.KafkaResources.zookeeperComponentName;
import static io.strimzi.systemtest.enums.CustomResourceStatus.NotReady;
import static io.strimzi.systemtest.enums.CustomResourceStatus.Ready;
import static io.strimzi.systemtest.utils.StUtils.indent;
import static io.strimzi.test.TestUtils.waitFor;
import static io.strimzi.test.k8s.KubeClusterResource.cmdKubeClient;
import static io.strimzi.test.k8s.KubeClusterResource.kubeClient;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -132,34 +126,6 @@ public static void waitUntilKafkaStatusConditionContainsMessage(String namespace
waitUntilKafkaStatusConditionContainsMessage(namespaceName, clusterName, pattern, TestConstants.GLOBAL_STATUS_TIMEOUT);
}

public static void waitForZkMntr(String namespaceName, String clusterName, Pattern pattern, int... podIndexes) {
long timeoutMs = 120_000L;
long pollMs = 1_000L;

for (int podIndex : podIndexes) {
String zookeeperPod = KafkaResources.zookeeperPodName(clusterName, podIndex);
String zookeeperPort = String.valueOf(12181);
waitFor("mntr", pollMs, timeoutMs, () -> {
try {
String output = cmdKubeClient(namespaceName).execInPod(zookeeperPod,
"/bin/bash", "-c", "echo mntr | nc localhost " + zookeeperPort).out();

if (pattern.matcher(output).find()) {
return true;
}
} catch (KubeClusterException e) {
LOGGER.trace("Exception while waiting for ZK to become leader/follower, ignoring", e);
}
return false;
},
() -> LOGGER.info("ZooKeeper `mntr` output at the point of timeout does not match {}:{}{}",
pattern.pattern(),
System.lineSeparator(),
indent(cmdKubeClient(namespaceName).execInPod(zookeeperPod, "/bin/bash", "-c", "echo mntr | nc localhost " + zookeeperPort).out()))
);
}
}

public static String getKafkaStatusCertificates(String namespaceName, String listenerType, String clusterName) {
String certs = "";
List<ListenerStatus> kafkaListeners = KafkaResource.kafkaClient().inNamespace(namespaceName).withName(clusterName).get().getStatus().getListeners();
Expand All @@ -186,9 +152,7 @@ public static void waitForKafkaSecretAndStatusCertsMatches(Supplier<String> kafk
@SuppressWarnings("unchecked")
public static void waitForClusterStability(String namespaceName, String clusterName) {
LabelSelector brokerSelector = KafkaResource.getLabelSelector(clusterName, kafkaComponentName(clusterName));
LabelSelector controllerSelector = KafkaResource.getLabelSelector(clusterName, zookeeperComponentName(clusterName));

Map<String, String>[] controllerPods = new Map[1];
Map<String, String>[] brokerPods = new Map[1];
Map<String, String>[] eoPods = new Map[1];

Expand All @@ -197,7 +161,6 @@ public static void waitForClusterStability(String namespaceName, String clusterN
int[] count = {0};

brokerPods[0] = PodUtils.podSnapshot(namespaceName, brokerSelector);
controllerPods[0] = PodUtils.podSnapshot(namespaceName, controllerSelector);
eoPods[0] = DeploymentUtils.depSnapshot(namespaceName, KafkaResources.entityOperatorDeploymentName(clusterName));

TestUtils.waitFor("Cluster to be stable and ready", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.TIMEOUT_FOR_CLUSTER_STABLE, () -> {
Expand All @@ -213,35 +176,16 @@ public static void waitForClusterStability(String namespaceName, String clusterN
LOGGER.warn("EO not stable");
}

if (!Environment.isKRaftModeEnabled()) {
Map<String, String> zkSnapshot = PodUtils.podSnapshot(namespaceName, controllerSelector);

boolean zkSameAsLast = zkSnapshot.equals(controllerPods[0]);

if (!zkSameAsLast) {
LOGGER.warn("ZK Cluster not stable");
}
if (zkSameAsLast && eoSameAsLast && kafkaSameAsLast) {
int c = count[0]++;
LOGGER.debug("All stable after: {} polls", c);
if (c > 60) {
LOGGER.info("Kafka cluster is stable after: {} polls", c);
return true;
}
return false;
}
controllerPods[0] = zkSnapshot;
} else {
if (kafkaSameAsLast && eoSameAsLast) {
int c = count[0]++;
LOGGER.debug("All stable after: {} polls", c);
if (c > 60) {
LOGGER.info("Kafka cluster is stable after: {} polls", c);
return true;
}
return false;
if (kafkaSameAsLast && eoSameAsLast) {
int c = count[0]++;
LOGGER.debug("All stable after: {} polls", c);
if (c > 60) {
LOGGER.info("Kafka cluster is stable after: {} polls", c);
return true;
}
return false;
}

brokerPods[0] = kafkaSnapshot;
eoPods[0] = eoSnapshot;

Expand Down Expand Up @@ -428,10 +372,9 @@ public static void waitForKafkaDeletion(String namespaceName, String kafkaCluste
LOGGER.info("Waiting for deletion of Kafka: {}/{}", namespaceName, kafkaClusterName);
TestUtils.waitFor("deletion of Kafka: " + namespaceName + "/" + kafkaClusterName, TestConstants.POLL_INTERVAL_FOR_RESOURCE_READINESS, DELETION_TIMEOUT,
() -> {
if (KafkaResource.kafkaClient().inNamespace(namespaceName).withName(kafkaClusterName).get() == null &&
StrimziPodSetResource.strimziPodSetClient().inNamespace(namespaceName).withName(KafkaResources.kafkaComponentName(kafkaClusterName)).get() == null &&
StrimziPodSetResource.strimziPodSetClient().inNamespace(namespaceName).withName(KafkaResources.zookeeperComponentName(kafkaClusterName)).get() == null &&
kubeClient(namespaceName).getDeployment(namespaceName, KafkaResources.entityOperatorDeploymentName(kafkaClusterName)) == null) {
if (KafkaResource.kafkaClient().inNamespace(namespaceName).withName(kafkaClusterName).get() == null
&& StrimziPodSetResource.strimziPodSetClient().inNamespace(namespaceName).withName(KafkaResources.kafkaComponentName(kafkaClusterName)).get() == null
&& kubeClient(namespaceName).getDeployment(namespaceName, KafkaResources.entityOperatorDeploymentName(kafkaClusterName)) == null) {
return true;
} else {
cmdKubeClient(namespaceName).deleteByName(Kafka.RESOURCE_KIND, kafkaClusterName);
Expand Down

0 comments on commit 0a669b8

Please sign in to comment.