From 751a6ba9be31853ed11534bd0fecd9202fb34141 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Kr=C3=A1l?= <53821852+im-konge@users.noreply.github.com> Date: Wed, 15 Jan 2025 09:10:58 +0100 Subject: [PATCH] [ST] Remove ZooKeeper from upgrade/downgrade tests (#11026) Signed-off-by: Lukas Kral <lukywill16@gmail.com> --- .azure/systemtests/systemtest-settings.xml | 7 - .../java/io/strimzi/systemtest/TestTags.java | 10 - .../listeners/ExecutionListener.java | 2 - .../CommonVersionModificationData.java | 8 - ...adeST.java => AbstractKRaftUpgradeST.java} | 829 +++++++++--------- .../KRaftKafkaUpgradeDowngradeST.java | 38 +- ...mUpgradeST.java => KRaftOlmUpgradeST.java} | 19 +- .../{kraft => }/KRaftStrimziDowngradeST.java | 4 +- .../{kraft => }/KRaftStrimziUpgradeST.java | 19 +- .../upgrade/kraft/AbstractKRaftUpgradeST.java | 218 ----- .../regular/KafkaUpgradeDowngradeST.java | 367 -------- .../upgrade/regular/StrimziDowngradeST.java | 66 -- .../upgrade/regular/StrimziUpgradeST.java | 177 ---- .../resources/upgrade/BundleDowngrade.yaml | 10 +- .../test/resources/upgrade/BundleUpgrade.yaml | 10 +- .../test/resources/upgrade/OlmUpgrade.yaml | 12 +- 16 files changed, 461 insertions(+), 1335 deletions(-) rename systemtest/src/test/java/io/strimzi/systemtest/upgrade/{AbstractUpgradeST.java => AbstractKRaftUpgradeST.java} (79%) rename systemtest/src/test/java/io/strimzi/systemtest/upgrade/{kraft => }/KRaftKafkaUpgradeDowngradeST.java (91%) rename systemtest/src/test/java/io/strimzi/systemtest/upgrade/{regular/OlmUpgradeST.java => KRaftOlmUpgradeST.java} (91%) rename systemtest/src/test/java/io/strimzi/systemtest/upgrade/{kraft => }/KRaftStrimziDowngradeST.java (94%) rename systemtest/src/test/java/io/strimzi/systemtest/upgrade/{kraft => }/KRaftStrimziUpgradeST.java (94%) delete mode 100644 systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java delete mode 100644 systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java delete mode 100644 systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java delete mode 100644 systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java diff --git a/.azure/systemtests/systemtest-settings.xml b/.azure/systemtests/systemtest-settings.xml index b360be6d3bd..82ac1071763 100644 --- a/.azure/systemtests/systemtest-settings.xml +++ b/.azure/systemtests/systemtest-settings.xml @@ -167,12 +167,5 @@ </properties> </profile> - <profile> - <id>azp_migration</id> - <properties> - <skipTests>false</skipTests> - <groups>migration</groups> - </properties> - </profile> </profiles> </settings> diff --git a/systemtest/src/main/java/io/strimzi/systemtest/TestTags.java b/systemtest/src/main/java/io/strimzi/systemtest/TestTags.java index 828ce3f515f..d4679afd733 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/TestTags.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/TestTags.java @@ -21,11 +21,6 @@ public interface TestTags { */ String REGRESSION = "regression"; - /** - * Tag for upgrade tests. - */ - String UPGRADE = "upgrade"; - /** * Tag for KRaft to KRaft tests. */ @@ -156,11 +151,6 @@ public interface TestTags { */ String ROUTE = "route"; - /** - * Tag for tests that focus on migration from ZK to KRaft - */ - String MIGRATION = "migration"; - /** * Tag for tests that focus on performance */ diff --git a/systemtest/src/main/java/io/strimzi/systemtest/listeners/ExecutionListener.java b/systemtest/src/main/java/io/strimzi/systemtest/listeners/ExecutionListener.java index 94452caecca..aa3a471c118 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/listeners/ExecutionListener.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/listeners/ExecutionListener.java @@ -67,8 +67,6 @@ public static boolean requiresSharedNamespace(final ExtensionContext extensionCo TestTags.DYNAMIC_CONFIGURATION, // Dynamic configuration also because in DynamicConfSharedST we use @TestFactory TestTags.TRACING, // Tracing, because we deploy Jaeger operator inside additional namespace TestTags.KAFKA_SMOKE, // KafkaVersionsST, MigrationST because here we use @ParameterizedTest - TestTags.MIGRATION, - TestTags.UPGRADE, TestTags.KRAFT_UPGRADE ); diff --git a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/CommonVersionModificationData.java b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/CommonVersionModificationData.java index 199cb85e951..7d18daabbe6 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/CommonVersionModificationData.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/CommonVersionModificationData.java @@ -101,14 +101,6 @@ public String getKafkaFilePathAfter() { return getFilePaths().get("kafkaAfter"); } - public String getKafkaKRaftFilePathBefore() { - return getFilePaths().get("kafkaKRaftBefore"); - } - - public String getKafkaKRaftFilePathAfter() { - return getFilePaths().get("kafkaKRaftAfter"); - } - @Override public String toString() { return "\n" + diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractKRaftUpgradeST.java similarity index 79% rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractKRaftUpgradeST.java index 1648a35de3b..6c363d1a673 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractKRaftUpgradeST.java @@ -16,6 +16,7 @@ import io.strimzi.api.kafka.model.connect.build.PluginBuilder; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaResources; +import io.strimzi.api.kafka.model.nodepool.ProcessRoles; import io.strimzi.api.kafka.model.topic.KafkaTopic; import io.strimzi.api.kafka.model.user.KafkaUser; import io.strimzi.operator.common.Annotations; @@ -27,6 +28,7 @@ import io.strimzi.systemtest.resources.NamespaceManager; import io.strimzi.systemtest.resources.ResourceManager; import io.strimzi.systemtest.resources.crd.KafkaConnectResource; +import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource; import io.strimzi.systemtest.resources.crd.KafkaResource; import io.strimzi.systemtest.resources.crd.KafkaTopicResource; import io.strimzi.systemtest.storage.TestStorage; @@ -60,7 +62,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Random; @@ -80,320 +81,106 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.jupiter.api.Assertions.fail; -public class AbstractUpgradeST extends AbstractST { +public class AbstractKRaftUpgradeST extends AbstractST { - private static final Logger LOGGER = LogManager.getLogger(AbstractUpgradeST.class); + private static final Logger LOGGER = LogManager.getLogger(AbstractKRaftUpgradeST.class); protected File dir = null; protected File coDir = null; protected File kafkaTopicYaml = null; protected File kafkaUserYaml = null; protected File kafkaConnectYaml; + protected File kafkaYaml; - protected final String clusterName = "my-cluster"; - protected final String poolName = "kafka"; - - protected Map<String, String> controllerPods; protected Map<String, String> brokerPods; + protected Map<String, String> controllerPods; protected Map<String, String> eoPods; protected Map<String, String> connectPods; - protected final LabelSelector brokerSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.kafkaComponentName(clusterName)); - protected final LabelSelector controllerSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.zookeeperComponentName(clusterName)); - protected final LabelSelector eoSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.entityOperatorDeploymentName(clusterName)); + protected static final String CONTROLLER_NODE_NAME = "controller"; + protected static final String BROKER_NODE_NAME = "broker"; + protected static final String CLUSTER_NAME = "my-cluster"; + protected static final String TOPIC_NAME = "my-topic"; + protected static final String USER_NAME = "my-user"; + protected static final int UPGRADE_TOPIC_COUNT = 20; + protected static final int BTO_KAFKA_TOPICS_ONLY_COUNT = 3; + + protected final LabelSelector controllerSelector = KafkaNodePoolResource.getLabelSelector(CLUSTER_NAME, CONTROLLER_NODE_NAME, ProcessRoles.CONTROLLER); + protected final LabelSelector brokerSelector = KafkaNodePoolResource.getLabelSelector(CLUSTER_NAME, BROKER_NODE_NAME, ProcessRoles.BROKER); + protected final LabelSelector eoSelector = KafkaResource.getLabelSelector(CLUSTER_NAME, KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME)); protected final LabelSelector coSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator")).build(); - protected final LabelSelector connectLabelSelector = KafkaConnectResource.getLabelSelector(clusterName, KafkaConnectResources.componentName(clusterName)); - - protected final String topicName = "my-topic"; - protected final String userName = "my-user"; - protected final int upgradeTopicCount = 20; - protected final int btoKafkaTopicsOnlyCount = 3; - - // ExpectedTopicCount contains additionally consumer-offset topic, my-topic and continuous-topic - protected File kafkaYaml; - - /** - * Based on {@param isUTOUsed} and {@param wasUTOUsedBefore} it returns the expected count of KafkaTopics. - * In case that UTO was used before and after, the expected number of KafkaTopics is {@link #upgradeTopicCount}. - * In other cases - BTO was used before or after the upgrade/downgrade - the expected number of KafkaTopics is {@link #upgradeTopicCount} - * with {@link #btoKafkaTopicsOnlyCount}. - * @param isUTOUsed boolean value determining if UTO is used after upgrade/downgrade of the CO - * @param wasUTOUsedBefore boolean value determining if UTO was used before upgrade/downgrade of the CO - * @return expected number of KafkaTopics - */ - protected int getExpectedTopicCount(boolean isUTOUsed, boolean wasUTOUsedBefore) { - if (isUTOUsed && wasUTOUsedBefore) { - // topics that are just present in Kafka itself are not created as CRs in UTO, thus -3 topics in comparison to regular upgrade - return upgradeTopicCount; - } + protected final LabelSelector connectLabelSelector = KafkaConnectResource.getLabelSelector(CLUSTER_NAME, KafkaConnectResources.componentName(CLUSTER_NAME)); - return upgradeTopicCount + btoKafkaTopicsOnlyCount; - } protected void makeComponentsSnapshots(String componentsNamespaceName) { + eoPods = DeploymentUtils.depSnapshot(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME)); controllerPods = PodUtils.podSnapshot(componentsNamespaceName, controllerSelector); brokerPods = PodUtils.podSnapshot(componentsNamespaceName, brokerSelector); - eoPods = DeploymentUtils.depSnapshot(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(clusterName)); connectPods = PodUtils.podSnapshot(componentsNamespaceName, connectLabelSelector); } - @SuppressWarnings("CyclomaticComplexity") - protected void changeKafkaVersion(String componentsNamespaceName, CommonVersionModificationData versionModificationData) throws IOException { - // Get Kafka configurations - String currentLogMessageFormat = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.config.log\\.message\\.format\\.version"); - String currentInterBrokerProtocol = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.config.inter\\.broker\\.protocol\\.version"); - - // Get Kafka version - String kafkaVersionFromCR = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.version"); - kafkaVersionFromCR = kafkaVersionFromCR.equals("") ? null : kafkaVersionFromCR; - String kafkaVersionFromProcedure = versionModificationData.getProcedures().getVersion(); - - // ####################################################################### - // ################# Update CRs to latest version ################### - // ####################################################################### - String examplesPath = ""; - if (versionModificationData.getToUrl().equals("HEAD")) { - examplesPath = PATH_TO_PACKAGING; - } else { - File dir = FileUtils.downloadAndUnzip(versionModificationData.getToUrl()); - examplesPath = dir.getAbsolutePath() + "/" + versionModificationData.getToExamples(); - } - - kafkaYaml = new File(examplesPath + versionModificationData.getKafkaFilePathAfter()); - LOGGER.info("Deploying Kafka from: {}", kafkaYaml.getPath()); - // Change kafka version of it's empty (null is for remove the version) - String defaultValueForVersions = kafkaVersionFromCR == null ? null : TestKafkaVersion.getSpecificVersion(kafkaVersionFromCR).messageVersion(); - cmdKubeClient(componentsNamespaceName).applyContent(KafkaUtils.changeOrRemoveKafkaConfiguration(kafkaYaml, kafkaVersionFromCR, defaultValueForVersions, defaultValueForVersions)); - - kafkaUserYaml = new File(examplesPath + "/examples/user/kafka-user.yaml"); - LOGGER.info("Deploying KafkaUser from: {}", kafkaUserYaml.getPath()); - cmdKubeClient(componentsNamespaceName).applyContent(KafkaUserUtils.removeKafkaUserPart(kafkaUserYaml, "authorization")); - - kafkaTopicYaml = new File(examplesPath + "/examples/topic/kafka-topic.yaml"); - LOGGER.info("Deploying KafkaTopic from: {}", kafkaTopicYaml.getPath()); - cmdKubeClient(componentsNamespaceName).applyContent(ReadWriteUtils.readFile(kafkaTopicYaml)); - // ####################################################################### - - - if (versionModificationData.getProcedures() != null && (!currentLogMessageFormat.isEmpty() || !currentInterBrokerProtocol.isEmpty())) { - if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure) - && ResourceManager.getTestContext().getTestClass().get().getSimpleName().toLowerCase(Locale.ROOT).contains("upgrade")) { - LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure); - cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure); - LOGGER.info("Waiting for Kafka rolling update to finish"); - brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, brokerSelector, 3, brokerPods); - } - - String logMessageVersion = versionModificationData.getProcedures().getLogMessageVersion(); - String interBrokerProtocolVersion = versionModificationData.getProcedures().getInterBrokerVersion(); - - if (logMessageVersion != null && !logMessageVersion.isEmpty() || interBrokerProtocolVersion != null && !interBrokerProtocolVersion.isEmpty()) { - if (!logMessageVersion.isEmpty()) { - LOGGER.info("Set log message format version to {} (current version is {})", logMessageVersion, currentLogMessageFormat); - cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/config/log.message.format.version", logMessageVersion); - } - - if (!interBrokerProtocolVersion.isEmpty()) { - LOGGER.info("Set inter-broker protocol version to {} (current version is {})", interBrokerProtocolVersion, currentInterBrokerProtocol); - LOGGER.info("Set inter-broker protocol version to " + interBrokerProtocolVersion); - cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/config/inter.broker.protocol.version", interBrokerProtocolVersion); - } - - if ((currentInterBrokerProtocol != null && !currentInterBrokerProtocol.equals(interBrokerProtocolVersion)) || - (currentLogMessageFormat != null && !currentLogMessageFormat.isEmpty() && !currentLogMessageFormat.equals(logMessageVersion))) { - LOGGER.info("Waiting for Kafka rolling update to finish"); - brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, brokerSelector, 3, brokerPods); - } - makeComponentsSnapshots(componentsNamespaceName); - } - - if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure) - && ResourceManager.getTestContext().getTestClass().get().getSimpleName().toLowerCase(Locale.ROOT).contains("downgrade")) { - LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure); - cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure); - LOGGER.info("Waiting for Kafka rolling update to finish"); - brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, brokerSelector, brokerPods); - } - } - } - - protected void changeKafkaVersionInKafkaConnect(String componentsNamespaceName, CommonVersionModificationData versionModificationData) { - UpgradeKafkaVersion upgradeToKafkaVersion = new UpgradeKafkaVersion(versionModificationData.getProcedures().getVersion()); - - String kafkaVersionFromConnectCR = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL), clusterName, ".spec.version"); - kafkaVersionFromConnectCR = kafkaVersionFromConnectCR.isEmpty() ? null : kafkaVersionFromConnectCR; - - if (kafkaVersionFromConnectCR != null && !kafkaVersionFromConnectCR.equals(upgradeToKafkaVersion.getVersion())) { - LOGGER.info(String.format("Setting Kafka version to %s in Kafka Connect", upgradeToKafkaVersion.getVersion())); - cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL), clusterName, "/spec/version", upgradeToKafkaVersion.getVersion()); - - LOGGER.info("Waiting for KafkaConnect rolling update to finish"); - connectPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, connectLabelSelector, 1, connectPods); - } - } - - protected void logComponentsPodImagesWithConnect(String componentsNamespaceName) { - logPodImages(componentsNamespaceName, controllerSelector, brokerSelector, eoSelector, connectLabelSelector); - } + protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure( + final String clusterOperatorNamespaceName, + final TestStorage testStorage, + final BundleVersionModificationData upgradeDowngradeData, + final UpgradeKafkaVersion upgradeKafkaVersion + ) throws IOException { + setupEnvAndUpgradeClusterOperator(clusterOperatorNamespaceName, testStorage, upgradeDowngradeData, upgradeKafkaVersion); + deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(testStorage, upgradeDowngradeData, upgradeKafkaVersion); - protected void logComponentsPodImages(String componentsNamespaceName) { - logPodImages(componentsNamespaceName, controllerSelector, brokerSelector, eoSelector); - } + // Check if UTO is used before changing the CO -> used for check for KafkaTopics + boolean wasUTOUsedBefore = StUtils.isUnidirectionalTopicOperatorUsed(testStorage.getNamespaceName(), eoSelector); - protected void logClusterOperatorPodImage(String clusterOperatorNamespaceName) { - logPodImages(clusterOperatorNamespaceName, coSelector); - } + final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME)) + .withNamespaceName(testStorage.getNamespaceName()) + .withUsername(USER_NAME) + .build(); - /** - * Logs images of Pods' containers in the specified {@param namespaceName}. Each image is logged per each label selector. - * - * @param namespaceName the name of the Kubernetes namespace where the pods are located - * @param labelSelectors optional array of {@link LabelSelector} objects used to filter pods based on labels. - * If no selectors are provided, no Pods are selected. - */ - protected void logPodImages(String namespaceName, LabelSelector... labelSelectors) { - Arrays.stream(labelSelectors) - .parallel() - .map(selector -> kubeClient().listPods(namespaceName, selector)) - .flatMap(Collection::stream) - .forEach(pod -> - pod.getSpec().getContainers().forEach(container -> - LOGGER.info("Pod: {}/{} has image {}", - pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage()) - )); - } + resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME)); + // Verify that Producer finish successfully + ClientUtils.waitForInstantProducerClientSuccess(testStorage); - protected void waitForKafkaClusterRollingUpdate(final String componentsNamespaceName) { - LOGGER.info("Waiting for ZK StrimziPodSet roll"); - controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(componentsNamespaceName, controllerSelector, 3, controllerPods); - LOGGER.info("Waiting for Kafka StrimziPodSet roll"); - brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(componentsNamespaceName, brokerSelector, 3, brokerPods); - LOGGER.info("Waiting for EO Deployment roll"); - // Check the TO and UO also got upgraded - eoPods = DeploymentUtils.waitTillDepHasRolled(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods); - } + makeComponentsSnapshots(testStorage.getNamespaceName()); + logComponentsPodImagesWithConnect(testStorage.getNamespaceName()); - protected void waitForReadinessOfKafkaCluster(final String componentsNamespaceName) { - LOGGER.info("Waiting for ZooKeeper StrimziPodSet"); - RollingUpdateUtils.waitForComponentAndPodsReady(componentsNamespaceName, controllerSelector, 3); - LOGGER.info("Waiting for Kafka StrimziPodSet"); - RollingUpdateUtils.waitForComponentAndPodsReady(componentsNamespaceName, brokerSelector, 3); - LOGGER.info("Waiting for EO Deployment"); - DeploymentUtils.waitForDeploymentAndPodsReady(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(clusterName), 1); - } + // Verify FileSink KafkaConnector before upgrade + String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName(); + KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount()); - protected void changeClusterOperator(String clusterOperatorNamespaceName, String componentsNamespaceName, BundleVersionModificationData versionModificationData) throws IOException { - final Map<String, String> coPods = DeploymentUtils.depSnapshot(clusterOperatorNamespaceName, ResourceManager.getCoDeploymentName()); + // Upgrade CO to HEAD and wait for readiness of ClusterOperator + changeClusterOperator(clusterOperatorNamespaceName, testStorage.getNamespaceName(), upgradeDowngradeData); - File coDir; - // Modify + apply installation files - LOGGER.info("Update CO from {} to {}", versionModificationData.getFromVersion(), versionModificationData.getToVersion()); - if (versionModificationData.getToVersion().equals("HEAD")) { - coDir = new File(TestUtils.USER_PATH + "/../packaging/install/cluster-operator"); - } else { - String url = versionModificationData.getToUrl(); - File dir = FileUtils.downloadAndUnzip(url); - coDir = new File(dir, versionModificationData.getToExamples() + "/install/cluster-operator/"); + if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeKafkaVersion.getVersion())) { + // Verify that Kafka and Connect Pods Rolled + waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName()); + connectPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), connectLabelSelector, 1, connectPods); + KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), CLUSTER_NAME); } - modifyApplyClusterOperatorWithCRDsFromFile(clusterOperatorNamespaceName, componentsNamespaceName, coDir, versionModificationData.getFeatureGatesAfter()); - - LOGGER.info("Waiting for CO upgrade"); - DeploymentUtils.waitTillDepHasRolled(clusterOperatorNamespaceName, ResourceManager.getCoDeploymentName(), 1, coPods); - } - - /** - * Series of steps done when applying operator from files located in root directory. Operator deployment is modified - * to watch multiple (single) namespace. All role based access control resources are modified so the subject is found - * in operator namespace. Role bindings concerning operands are modified to be deployed in watched namespace. - * - * @param clusterOperatorNamespaceName the name of the namespace where the Strimzi operator is deployed. - * @param componentsNamespaceName the name of the single namespace being watched and managed by the Strimzi operator. - * @param root the root directory containing the YAML files to be processed. - * @param strimziFeatureGatesValue the value of the Strimzi feature gates to be injected into deployment configurations. - */ - protected void modifyApplyClusterOperatorWithCRDsFromFile(String clusterOperatorNamespaceName, String componentsNamespaceName, File root, final String strimziFeatureGatesValue) { - KubeClusterResource.getInstance().setNamespace(clusterOperatorNamespaceName); - - final List<String> watchedNsRoleBindingFilePrefixes = List.of( - "020-RoleBinding", // rb to role for creating KNative resources - "023-RoleBinding", // rb to role for watching Strimzi CustomResources - "031-RoleBinding" // rb to role for entity operator - ); + logComponentsPodImagesWithConnect(testStorage.getNamespaceName()); - Arrays.stream(Objects.requireNonNull(root.listFiles())).sorted().forEach(f -> { - if (watchedNsRoleBindingFilePrefixes.stream().anyMatch((rbFilePrefix) -> f.getName().startsWith(rbFilePrefix))) { - cmdKubeClient(componentsNamespaceName).replaceContent(StUtils.changeRoleBindingSubject(f, clusterOperatorNamespaceName)); - } else if (f.getName().matches(".*RoleBinding.*")) { - cmdKubeClient(clusterOperatorNamespaceName).replaceContent(StUtils.changeRoleBindingSubject(f, clusterOperatorNamespaceName)); - } else if (f.getName().matches(".*Deployment.*")) { - cmdKubeClient(clusterOperatorNamespaceName).replaceContent(StUtils.changeDeploymentConfiguration(componentsNamespaceName, f, strimziFeatureGatesValue)); - } else { - cmdKubeClient(clusterOperatorNamespaceName).replaceContent(ReadWriteUtils.readFile(f)); - } - }); - } + // Upgrade/Downgrade kafka + changeKafkaVersion(testStorage.getNamespaceName(), upgradeDowngradeData); + changeKafkaVersionInKafkaConnect(testStorage.getNamespaceName(), upgradeDowngradeData); - protected void deleteInstalledYamls(String clusterOperatorNamespaceName, String componentsNamespaceName, File root) { - if (kafkaUserYaml != null) { - LOGGER.info("Deleting KafkaUser configuration files"); - cmdKubeClient(componentsNamespaceName).delete(kafkaUserYaml); - } - if (kafkaTopicYaml != null) { - LOGGER.info("Deleting KafkaTopic configuration files"); - KafkaTopicUtils.setFinalizersInAllTopicsToNull(componentsNamespaceName); - cmdKubeClient().delete(kafkaTopicYaml); - } - if (kafkaYaml != null) { - LOGGER.info("Deleting Kafka configuration files"); - cmdKubeClient(componentsNamespaceName).delete(kafkaYaml); - } - if (root != null) { - final List<String> watchedNsRoleBindingFilePrefixes = List.of( - "020-RoleBinding", // rb to role for creating KNative resources - "023-RoleBinding", // rb to role for watching Strimzi CustomResources - "031-RoleBinding" // rb to role for entity operator - ); + logComponentsPodImagesWithConnect(testStorage.getNamespaceName()); + checkAllComponentsImages(testStorage.getNamespaceName(), upgradeDowngradeData); - Arrays.stream(Objects.requireNonNull(root.listFiles())).sorted().forEach(f -> { - try { - if (watchedNsRoleBindingFilePrefixes.stream().anyMatch((rbFilePrefix) -> f.getName().startsWith(rbFilePrefix))) { - cmdKubeClient(componentsNamespaceName).deleteContent(StUtils.changeRoleBindingSubject(f, clusterOperatorNamespaceName)); - } else if (f.getName().matches(".*RoleBinding.*")) { - cmdKubeClient(clusterOperatorNamespaceName).deleteContent(StUtils.changeRoleBindingSubject(f, clusterOperatorNamespaceName)); - } else { - cmdKubeClient(clusterOperatorNamespaceName).delete(f); - } - } catch (Exception ex) { - LOGGER.warn("Failed to delete resources: {}", f.getName()); - } - }); - } - } + // send again new messages + resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME)); - protected void checkAllComponentsImages(String componentsNamespaceName, BundleVersionModificationData versionModificationData) { - if (versionModificationData.getImagesAfterOperations().isEmpty()) { - fail("There are no expected images"); - } + // Verify that Producer finish successfully + ClientUtils.waitForInstantProducerClientSuccess(testStorage.getNamespaceName(), testStorage); - checkContainerImages(componentsNamespaceName, controllerSelector, versionModificationData.getZookeeperImage()); - checkContainerImages(componentsNamespaceName, brokerSelector, versionModificationData.getKafkaImage()); - checkContainerImages(componentsNamespaceName, eoSelector, versionModificationData.getTopicOperatorImage()); - checkContainerImages(componentsNamespaceName, eoSelector, 1, versionModificationData.getUserOperatorImage()); - } + // Verify FileSink KafkaConnector + connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName(); + KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount()); - protected void checkContainerImages(String namespaceName, LabelSelector labelSelector, String image) { - checkContainerImages(namespaceName, labelSelector, 0, image); - } + // Verify that pods are stable + PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), CLUSTER_NAME); - protected void checkContainerImages(String namespaceName, LabelSelector labelSelector, int container, String image) { - List<Pod> pods1 = kubeClient(namespaceName).listPods(labelSelector); - for (Pod pod : pods1) { - if (!image.equals(pod.getSpec().getContainers().get(container).getImage())) { - LOGGER.debug("Expected image for Pod: {}/{}: {} \nCurrent image: {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), image, pod.getSpec().getContainers().get(container).getImage()); - assertThat("Used image for Pod: " + pod.getMetadata().getNamespace() + "/" + pod.getMetadata().getName() + " is not valid!", pod.getSpec().getContainers().get(container).getImage(), containsString(image)); - } - } + // Verify upgrade + verifyProcedure(testStorage.getNamespaceName(), upgradeDowngradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore); } protected void setupEnvAndUpgradeClusterOperator(String clusterOperatorNamespaceName, TestStorage testStorage, BundleVersionModificationData upgradeData, UpgradeKafkaVersion upgradeKafkaVersion) throws IOException { @@ -412,8 +199,8 @@ protected void setupEnvAndUpgradeClusterOperator(String clusterOperatorNamespace kafkaTopicYaml = new File(dir, upgradeData.getFromExamples() + "/examples/topic/kafka-topic.yaml"); } - String topicNameTemplate = topicName + "-%s"; - IntStream.range(0, upgradeTopicCount) + String topicNameTemplate = TOPIC_NAME + "-%s"; + IntStream.range(0, UPGRADE_TOPIC_COUNT) .mapToObj(topicNameTemplate::formatted) .map(this::getKafkaYamlWithName) .parallel() @@ -430,10 +217,10 @@ protected void setupEnvAndUpgradeClusterOperator(String clusterOperatorNamespace kafkaTopicYaml = new File(dir, pathToTopicExamples); cmdKubeClient(testStorage.getNamespaceName()).applyContent(ReadWriteUtils.readFile(kafkaTopicYaml) - .replace("name: my-topic", "name: " + testStorage.getTopicName()) - .replace("partitions: 1", "partitions: 3") - .replace("replicas: 1", "replicas: 3") + - " min.insync.replicas: 2"); + .replace("name: my-topic", "name: " + testStorage.getTopicName()) + .replace("partitions: 1", "partitions: 3") + .replace("replicas: 1", "replicas: 3") + + " min.insync.replicas: 2"); ResourceManager.waitForResourceReadiness(testStorage.getNamespaceName(), getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL), testStorage.getTopicName()); } @@ -442,7 +229,7 @@ protected void setupEnvAndUpgradeClusterOperator(String clusterOperatorNamespace String producerAdditionConfiguration = "delivery.timeout.ms=40000\nrequest.timeout.ms=5000"; KafkaClients kafkaBasicClientJob = ClientUtils.getContinuousPlainClientBuilder(testStorage) - .withBootstrapAddress(KafkaResources.plainBootstrapAddress(clusterName)) + .withBootstrapAddress(KafkaResources.plainBootstrapAddress(CLUSTER_NAME)) .withMessageCount(upgradeData.getContinuousClientsMessages()) .withAdditionalConfig(producerAdditionConfiguration) .withNamespaceName(testStorage.getNamespaceName()) @@ -458,44 +245,8 @@ protected void setupEnvAndUpgradeClusterOperator(String clusterOperatorNamespace makeComponentsSnapshots(testStorage.getNamespaceName()); } - private String getKafkaYamlWithName(String name) { - String initialName = "name: my-topic"; - String newName = "name: %s".formatted(name); - - return ReadWriteUtils.readFile(kafkaTopicYaml).replace(initialName, newName); - } - - protected void verifyProcedure(String componentsNamespaceNames, BundleVersionModificationData upgradeData, String producerName, String consumerName, boolean wasUTOUsedBefore) { - - if (upgradeData.getAdditionalTopics() != null) { - boolean isUTOUsed = StUtils.isUnidirectionalTopicOperatorUsed(componentsNamespaceNames, eoSelector); - - // Check that topics weren't deleted/duplicated during upgrade procedures - String listedTopics = cmdKubeClient(componentsNamespaceNames).getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL)); - int additionalTopics = upgradeData.getAdditionalTopics(); - assertThat("KafkaTopic list doesn't have expected size", Long.valueOf(listedTopics.lines().count() - 1).intValue(), greaterThanOrEqualTo(getExpectedTopicCount(isUTOUsed, wasUTOUsedBefore) + additionalTopics)); - assertThat("KafkaTopic " + topicName + " is not in expected Topic list", - listedTopics.contains(topicName), is(true)); - for (int x = 0; x < upgradeTopicCount; x++) { - assertThat("KafkaTopic " + topicName + "-" + x + " is not in expected Topic list", listedTopics.contains(topicName + "-" + x), is(true)); - } - } - - if (upgradeData.getContinuousClientsMessages() != 0) { - // ############################## - // Validate that continuous clients finished successfully - // ############################## - ClientUtils.waitForClientsSuccess(componentsNamespaceNames, consumerName, producerName, upgradeData.getContinuousClientsMessages()); - // ############################## - } - } - - protected String getResourceApiVersion(String resourcePlural) { - return resourcePlural + "." + Constants.V1BETA2 + "." + Constants.RESOURCE_GROUP_NAME; - } - - protected void deployCoWithWaitForReadiness(final String clusterOperatorNamespaceName, final String componentsNamespaceName, final BundleVersionModificationData upgradeData) throws IOException { - LOGGER.info("Deploying CO: {} in Namespace: {}", ResourceManager.getCoDeploymentName(), clusterOperatorNamespaceName); + protected void deployCoWithWaitForReadiness(final String clusterOperatorNamespaceName, final String componentsNamespaceName, final BundleVersionModificationData upgradeData) throws IOException { + LOGGER.info("Deploying CO: {} in Namespace: {}", ResourceManager.getCoDeploymentName(), clusterOperatorNamespaceName); if (upgradeData.getFromVersion().equals("HEAD")) { coDir = new File(TestUtils.USER_PATH + "/../packaging/install/cluster-operator"); @@ -513,33 +264,37 @@ protected void deployCoWithWaitForReadiness(final String clusterOperatorNamespac LOGGER.info("{} is ready", ResourceManager.getCoDeploymentName()); } - protected void deployKafkaClusterWithWaitForReadiness(final String componentsNamespaceName, final BundleVersionModificationData upgradeData, final UpgradeKafkaVersion upgradeKafkaVersion) { - LOGGER.info("Deploying Kafka: {}/{}", componentsNamespaceName, clusterName); + LOGGER.info("Deploying Kafka: {}/{}", componentsNamespaceName, CLUSTER_NAME); - if (!cmdKubeClient(componentsNamespaceName).getResources(getResourceApiVersion(Kafka.RESOURCE_PLURAL)).contains(clusterName)) { + if (!cmdKubeClient(componentsNamespaceName).getResources(getResourceApiVersion(Kafka.RESOURCE_PLURAL)).contains(CLUSTER_NAME)) { // Deploy a Kafka cluster if (upgradeData.getFromExamples().equals("HEAD")) { - resourceManager.createResourceWithWait(KafkaNodePoolTemplates.brokerPoolPersistentStorage(componentsNamespaceName, poolName, clusterName, 3).build()); - resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentNodePools(componentsNamespaceName, clusterName, 3, 3) - .editSpec() - .editKafka() - .withVersion(upgradeKafkaVersion.getVersion()) - .addToConfig("log.message.format.version", upgradeKafkaVersion.getLogMessageVersion()) - .addToConfig("inter.broker.protocol.version", upgradeKafkaVersion.getInterBrokerVersion()) - .endKafka() - .endSpec() - .build()); + resourceManager.createResourceWithWait( + KafkaNodePoolTemplates.controllerPoolPersistentStorage(componentsNamespaceName, CONTROLLER_NODE_NAME, CLUSTER_NAME, 3).build(), + KafkaNodePoolTemplates.brokerPoolPersistentStorage(componentsNamespaceName, BROKER_NODE_NAME, CLUSTER_NAME, 3).build(), + KafkaTemplates.kafkaPersistentKRaft(componentsNamespaceName, CLUSTER_NAME, 3) + .editMetadata() + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled") + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled") + .endMetadata() + .editSpec() + .editKafka() + .withVersion(upgradeKafkaVersion.getVersion()) + .withMetadataVersion(upgradeKafkaVersion.getMetadataVersion()) + .endKafka() + .endSpec() + .build()); } else { kafkaYaml = new File(dir, upgradeData.getFromExamples() + upgradeData.getKafkaFilePathBefore()); LOGGER.info("Deploying Kafka from: {}", kafkaYaml.getPath()); // Change kafka version of it's empty (null is for remove the version) if (upgradeKafkaVersion == null) { - cmdKubeClient(componentsNamespaceName).applyContent(KafkaUtils.changeOrRemoveKafkaVersion(kafkaYaml, null)); + cmdKubeClient(componentsNamespaceName).applyContent(KafkaUtils.changeOrRemoveKafkaInKRaft(kafkaYaml, null)); } else { - cmdKubeClient(componentsNamespaceName).applyContent(KafkaUtils.changeOrRemoveKafkaConfiguration(kafkaYaml, upgradeKafkaVersion.getVersion(), upgradeKafkaVersion.getLogMessageVersion(), upgradeKafkaVersion.getInterBrokerVersion())); + cmdKubeClient(componentsNamespaceName).applyContent(KafkaUtils.changeOrRemoveKafkaConfigurationInKRaft(kafkaYaml, upgradeKafkaVersion.getVersion(), upgradeKafkaVersion.getMetadataVersion())); } // Wait for readiness waitForReadinessOfKafkaCluster(componentsNamespaceName); @@ -548,24 +303,24 @@ protected void deployKafkaClusterWithWaitForReadiness(final String componentsNam } protected void deployKafkaUserWithWaitForReadiness(final String componentsNamespaceName, final BundleVersionModificationData upgradeData) { - LOGGER.info("Deploying KafkaUser: {}/{}", componentsNamespaceName, userName); + LOGGER.info("Deploying KafkaUser: {}/{}", componentsNamespaceName, USER_NAME); - if (!cmdKubeClient(componentsNamespaceName).getResources(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL)).contains(userName)) { + if (!cmdKubeClient(componentsNamespaceName).getResources(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL)).contains(USER_NAME)) { if (upgradeData.getFromVersion().equals("HEAD")) { - resourceManager.createResourceWithWait(KafkaUserTemplates.tlsUser(componentsNamespaceName, userName, clusterName).build()); + resourceManager.createResourceWithWait(KafkaUserTemplates.tlsUser(componentsNamespaceName, USER_NAME, CLUSTER_NAME).build()); } else { kafkaUserYaml = new File(dir, upgradeData.getFromExamples() + "/examples/user/kafka-user.yaml"); LOGGER.info("Deploying KafkaUser from: {}", kafkaUserYaml.getPath()); cmdKubeClient(componentsNamespaceName).applyContent(KafkaUserUtils.removeKafkaUserPart(kafkaUserYaml, "authorization")); - ResourceManager.waitForResourceReadiness(componentsNamespaceName, getResourceApiVersion(KafkaUser.RESOURCE_PLURAL), userName); + ResourceManager.waitForResourceReadiness(componentsNamespaceName, getResourceApiVersion(KafkaUser.RESOURCE_PLURAL), USER_NAME); } } } protected void deployKafkaTopicWithWaitForReadiness(final String componentsNamespaceName, final BundleVersionModificationData upgradeData) { - LOGGER.info("Deploying KafkaTopic: {}/{}", componentsNamespaceName, topicName); + LOGGER.info("Deploying KafkaTopic: {}/{}", componentsNamespaceName, TOPIC_NAME); - if (!cmdKubeClient(componentsNamespaceName).getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL)).contains(topicName)) { + if (!cmdKubeClient(componentsNamespaceName).getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL)).contains(TOPIC_NAME)) { if (upgradeData.getFromVersion().equals("HEAD")) { kafkaTopicYaml = new File(dir, PATH_TO_PACKAGING_EXAMPLES + "/topic/kafka-topic.yaml"); } else { @@ -573,7 +328,7 @@ protected void deployKafkaTopicWithWaitForReadiness(final String componentsNames } LOGGER.info("Deploying KafkaTopic from: {}", kafkaTopicYaml.getPath()); cmdKubeClient(componentsNamespaceName).create(kafkaTopicYaml); - ResourceManager.waitForResourceReadiness(componentsNamespaceName, getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL), topicName); + ResourceManager.waitForResourceReadiness(componentsNamespaceName, getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL), TOPIC_NAME); } } @@ -583,9 +338,9 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness( final UpgradeKafkaVersion upgradeKafkaVersion ) { // setup KafkaConnect + KafkaConnector - if (!cmdKubeClient(testStorage.getNamespaceName()).getResources(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL)).contains(clusterName)) { + if (!cmdKubeClient(testStorage.getNamespaceName()).getResources(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL)).contains(CLUSTER_NAME)) { if (acrossUpgradeData.getFromVersion().equals("HEAD")) { - resourceManager.createResourceWithWait(KafkaConnectTemplates.kafkaConnectWithFilePlugin(testStorage.getNamespaceName(), clusterName, 1) + resourceManager.createResourceWithWait(KafkaConnectTemplates.kafkaConnectWithFilePlugin(testStorage.getNamespaceName(), CLUSTER_NAME, 1) .editMetadata() .addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true") .endMetadata() @@ -597,7 +352,7 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness( .withVersion(upgradeKafkaVersion.getVersion()) .endSpec() .build()); - resourceManager.createResourceWithWait(KafkaConnectorTemplates.kafkaConnector(testStorage.getNamespaceName(), clusterName) + resourceManager.createResourceWithWait(KafkaConnectorTemplates.kafkaConnector(testStorage.getNamespaceName(), CLUSTER_NAME) .editSpec() .withClassName("org.apache.kafka.connect.file.FileStreamSinkConnector") .addToConfig("topics", testStorage.getTopicName()) @@ -620,7 +375,7 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness( KafkaConnect kafkaConnect = new KafkaConnectBuilder(ReadWriteUtils.readObjectFromYamlFilepath(kafkaConnectYaml, KafkaConnect.class)) .editMetadata() - .withName(clusterName) + .withName(CLUSTER_NAME) .addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true") .endMetadata() .editSpec() @@ -642,7 +397,7 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness( ResourceManager.waitForResourceReadiness(testStorage.getNamespaceName(), getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL), kafkaConnect.getMetadata().getName()); // in our examples is no sink connector and thus we are using the same as in HEAD verification - resourceManager.createResourceWithWait(KafkaConnectorTemplates.kafkaConnector(testStorage.getNamespaceName(), clusterName) + resourceManager.createResourceWithWait(KafkaConnectorTemplates.kafkaConnector(testStorage.getNamespaceName(), CLUSTER_NAME) .editMetadata() .addToLabels(Labels.STRIMZI_CLUSTER_LABEL, kafkaConnect.getMetadata().getName()) .endMetadata() @@ -656,68 +411,279 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness( } } - protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure( - final String clusterOperatorNamespaceName, - final TestStorage testStorage, - final BundleVersionModificationData upgradeDowngradeData, - final UpgradeKafkaVersion upgradeKafkaVersion - ) throws IOException { - setupEnvAndUpgradeClusterOperator(clusterOperatorNamespaceName, testStorage, upgradeDowngradeData, upgradeKafkaVersion); - this.deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(testStorage, upgradeDowngradeData, upgradeKafkaVersion); + protected void waitForKafkaClusterRollingUpdate(final String componentsNamespaceName) { + LOGGER.info("Waiting for Kafka Pods with controller role to be rolled"); + controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(componentsNamespaceName, controllerSelector, 3, controllerPods); + LOGGER.info("Waiting for Kafka Pods with broker role to be rolled"); + brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(componentsNamespaceName, brokerSelector, 3, brokerPods); + LOGGER.info("Waiting for EO Deployment to be rolled"); + // Check the TO and UO also got upgraded + eoPods = DeploymentUtils.waitTillDepHasRolled(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME), 1, eoPods); + } - // Check if UTO is used before changing the CO -> used for check for KafkaTopics - boolean wasUTOUsedBefore = StUtils.isUnidirectionalTopicOperatorUsed(testStorage.getNamespaceName(), eoSelector); + protected void waitForReadinessOfKafkaCluster(final String componentsNamespaceName) { + LOGGER.info("Waiting for Kafka Pods with controller role to be ready"); + RollingUpdateUtils.waitForComponentAndPodsReady(componentsNamespaceName, controllerSelector, 3); + LOGGER.info("Waiting for Kafka Pods with broker role to be ready"); + RollingUpdateUtils.waitForComponentAndPodsReady(componentsNamespaceName, brokerSelector, 3); + LOGGER.info("Waiting for EO Deployment"); + DeploymentUtils.waitForDeploymentAndPodsReady(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME), 1); + } - final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(clusterName)) - .withNamespaceName(testStorage.getNamespaceName()) - .withUsername(userName) - .build(); + protected void changeClusterOperator(String clusterOperatorNamespaceName, String componentsNamespaceName, BundleVersionModificationData versionModificationData) throws IOException { + final Map<String, String> coPods = DeploymentUtils.depSnapshot(clusterOperatorNamespaceName, ResourceManager.getCoDeploymentName()); - resourceManager.createResourceWithWait(clients.producerTlsStrimzi(clusterName)); - // Verify that Producer finish successfully - ClientUtils.waitForInstantProducerClientSuccess(testStorage); + File coDir; + // Modify + apply installation files + LOGGER.info("Update CO from {} to {}", versionModificationData.getFromVersion(), versionModificationData.getToVersion()); + if (versionModificationData.getToVersion().equals("HEAD")) { + coDir = new File(TestUtils.USER_PATH + "/../packaging/install/cluster-operator"); + } else { + String url = versionModificationData.getToUrl(); + File dir = FileUtils.downloadAndUnzip(url); + coDir = new File(dir, versionModificationData.getToExamples() + "/install/cluster-operator/"); + } - makeComponentsSnapshots(testStorage.getNamespaceName()); - logComponentsPodImagesWithConnect(testStorage.getNamespaceName()); + modifyApplyClusterOperatorWithCRDsFromFile(clusterOperatorNamespaceName, componentsNamespaceName, coDir, versionModificationData.getFeatureGatesAfter()); - // Verify FileSink KafkaConnector before upgrade - String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName(); - KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount()); + LOGGER.info("Waiting for CO upgrade"); + DeploymentUtils.waitTillDepHasRolled(clusterOperatorNamespaceName, ResourceManager.getCoDeploymentName(), 1, coPods); + } - // Upgrade CO to HEAD and wait for readiness of ClusterOperator - changeClusterOperator(clusterOperatorNamespaceName, testStorage.getNamespaceName(), upgradeDowngradeData); + /** + * Series of steps done when applying operator from files located in root directory. Operator deployment is modified + * to watch multiple (single) namespace. All role based access control resources are modified so the subject is found + * in operator namespace. Role bindings concerning operands are modified to be deployed in watched namespace. + * + * @param clusterOperatorNamespaceName the name of the namespace where the Strimzi operator is deployed. + * @param componentsNamespaceName the name of the single namespace being watched and managed by the Strimzi operator. + * @param root the root directory containing the YAML files to be processed. + * @param strimziFeatureGatesValue the value of the Strimzi feature gates to be injected into deployment configurations. + */ + protected void modifyApplyClusterOperatorWithCRDsFromFile(String clusterOperatorNamespaceName, String componentsNamespaceName, File root, final String strimziFeatureGatesValue) { + KubeClusterResource.getInstance().setNamespace(clusterOperatorNamespaceName); - if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeKafkaVersion.getVersion())) { - // Verify that Kafka and Connect Pods Rolled - waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName()); - connectPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), connectLabelSelector, 1, connectPods); - KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), clusterName); + final List<String> watchedNsRoleBindingFilePrefixes = List.of( + "020-RoleBinding", // rb to role for creating KNative resources + "023-RoleBinding", // rb to role for watching Strimzi CustomResources + "031-RoleBinding" // rb to role for entity operator + ); + + Arrays.stream(Objects.requireNonNull(root.listFiles())).sorted().forEach(f -> { + if (watchedNsRoleBindingFilePrefixes.stream().anyMatch((rbFilePrefix) -> f.getName().startsWith(rbFilePrefix))) { + cmdKubeClient(componentsNamespaceName).replaceContent(StUtils.changeRoleBindingSubject(f, clusterOperatorNamespaceName)); + } else if (f.getName().matches(".*RoleBinding.*")) { + cmdKubeClient(clusterOperatorNamespaceName).replaceContent(StUtils.changeRoleBindingSubject(f, clusterOperatorNamespaceName)); + } else if (f.getName().matches(".*Deployment.*")) { + cmdKubeClient(clusterOperatorNamespaceName).replaceContent(StUtils.changeDeploymentConfiguration(componentsNamespaceName, f, strimziFeatureGatesValue)); + } else { + cmdKubeClient(clusterOperatorNamespaceName).replaceContent(ReadWriteUtils.readFile(f)); + } + }); + } + + protected void changeKafkaVersion(final String componentsNamespaceName, CommonVersionModificationData versionModificationData) throws IOException { + changeKafkaVersion(componentsNamespaceName, versionModificationData, false); + } + + /** + * Method for changing Kafka `version` and `metadataVersion` fields in Kafka CR based on the current scenario + * @param versionModificationData data structure holding information about the desired steps/versions that should be applied + * @param replaceEvenIfMissing current workaround for the situation when `metadataVersion` is not set in Kafka CR -> that's because previous version of operator + * doesn't contain this kind of field, so even if we set this field in the Kafka CR, it is removed by the operator + * this is needed for correct functionality of the `testUpgradeAcrossVersionsWithUnsupportedKafkaVersion` test + * @throws IOException exception during application of YAML files + */ + @SuppressWarnings("CyclomaticComplexity") + protected void changeKafkaVersion(final String componentsNamespaceName, CommonVersionModificationData versionModificationData, boolean replaceEvenIfMissing) throws IOException { + // Get Kafka version + String kafkaVersionFromCR = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), CLUSTER_NAME, ".spec.kafka.version"); + kafkaVersionFromCR = kafkaVersionFromCR.equals("") ? null : kafkaVersionFromCR; + // Get Kafka metadata version + String currentMetadataVersion = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), CLUSTER_NAME, ".spec.kafka.metadataVersion"); + + String kafkaVersionFromProcedure = versionModificationData.getProcedures().getVersion(); + + // ####################################################################### + // ################# Update CRs to latest version ################### + // ####################################################################### + String examplesPath = downloadExamplesAndGetPath(versionModificationData); + String kafkaFilePath = examplesPath + versionModificationData.getKafkaFilePathAfter(); + + applyCustomResourcesFromPath(componentsNamespaceName, examplesPath, kafkaFilePath, kafkaVersionFromCR, currentMetadataVersion); + + // ####################################################################### + + if (versionModificationData.getProcedures() != null && (!currentMetadataVersion.isEmpty() || replaceEvenIfMissing)) { + + if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure)) { + LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure); + cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), CLUSTER_NAME, "/spec/kafka/version", kafkaVersionFromProcedure); + + waitForKafkaControllersAndBrokersFinishRollingUpdate(componentsNamespaceName); + } + + String metadataVersion = versionModificationData.getProcedures().getMetadataVersion(); + + if (metadataVersion != null && !metadataVersion.isEmpty()) { + LOGGER.info("Set metadata version to {} (current version is {})", metadataVersion, currentMetadataVersion); + cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), CLUSTER_NAME, "/spec/kafka/metadataVersion", metadataVersion); + + makeComponentsSnapshots(componentsNamespaceName); + } } + } - logComponentsPodImagesWithConnect(testStorage.getNamespaceName()); + protected void changeKafkaVersionInKafkaConnect(String componentsNamespaceName, CommonVersionModificationData versionModificationData) { + UpgradeKafkaVersion upgradeToKafkaVersion = new UpgradeKafkaVersion(versionModificationData.getProcedures().getVersion()); - // Upgrade/Downgrade kafka - changeKafkaVersion(testStorage.getNamespaceName(), upgradeDowngradeData); - changeKafkaVersionInKafkaConnect(testStorage.getNamespaceName(), upgradeDowngradeData); + String kafkaVersionFromConnectCR = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL), CLUSTER_NAME, ".spec.version"); + kafkaVersionFromConnectCR = kafkaVersionFromConnectCR.isEmpty() ? null : kafkaVersionFromConnectCR; - logComponentsPodImagesWithConnect(testStorage.getNamespaceName()); - checkAllComponentsImages(testStorage.getNamespaceName(), upgradeDowngradeData); + if (kafkaVersionFromConnectCR != null && !kafkaVersionFromConnectCR.equals(upgradeToKafkaVersion.getVersion())) { + LOGGER.info(String.format("Setting Kafka version to %s in Kafka Connect", upgradeToKafkaVersion.getVersion())); + cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL), CLUSTER_NAME, "/spec/version", upgradeToKafkaVersion.getVersion()); - // send again new messages - resourceManager.createResourceWithWait(clients.producerTlsStrimzi(clusterName)); + LOGGER.info("Waiting for KafkaConnect rolling update to finish"); + connectPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, connectLabelSelector, 1, connectPods); + } + } - // Verify that Producer finish successfully - ClientUtils.waitForInstantProducerClientSuccess(testStorage.getNamespaceName(), testStorage); + protected void checkAllComponentsImages(String componentsNamespaceName, BundleVersionModificationData versionModificationData) { + if (versionModificationData.getImagesAfterOperations().isEmpty()) { + fail("There are no expected images"); + } - // Verify FileSink KafkaConnector - connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName(); - KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount()); + checkContainerImages(componentsNamespaceName, controllerSelector, versionModificationData.getKafkaImage()); + checkContainerImages(componentsNamespaceName, brokerSelector, versionModificationData.getKafkaImage()); + checkContainerImages(componentsNamespaceName, eoSelector, versionModificationData.getTopicOperatorImage()); + checkContainerImages(componentsNamespaceName, eoSelector, 1, versionModificationData.getUserOperatorImage()); + } - // Verify that pods are stable - PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName); + protected void checkContainerImages(String namespaceName, LabelSelector labelSelector, String image) { + checkContainerImages(namespaceName, labelSelector, 0, image); + } - // Verify upgrade - verifyProcedure(testStorage.getNamespaceName(), upgradeDowngradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore); + protected void checkContainerImages(String namespaceName, LabelSelector labelSelector, int container, String image) { + List<Pod> pods1 = kubeClient(namespaceName).listPods(labelSelector); + for (Pod pod : pods1) { + if (!image.equals(pod.getSpec().getContainers().get(container).getImage())) { + LOGGER.debug("Expected image for Pod: {}/{}: {} \nCurrent image: {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), image, pod.getSpec().getContainers().get(container).getImage()); + assertThat("Used image for Pod: " + pod.getMetadata().getNamespace() + "/" + pod.getMetadata().getName() + " is not valid!", pod.getSpec().getContainers().get(container).getImage(), containsString(image)); + } + } + } + + protected void logComponentsPodImagesWithConnect(String componentsNamespaceName) { + logPodImages(componentsNamespaceName, controllerSelector, brokerSelector, eoSelector, connectLabelSelector); + } + + protected void logComponentsPodImages(String componentsNamespaceName) { + logPodImages(componentsNamespaceName, controllerSelector, brokerSelector, eoSelector); + } + + protected void logClusterOperatorPodImage(String clusterOperatorNamespaceName) { + logPodImages(clusterOperatorNamespaceName, coSelector); + } + + /** + * Logs images of Pods' containers in the specified {@param namespaceName}. Each image is logged per each label selector. + * + * @param namespaceName the name of the Kubernetes namespace where the pods are located + * @param labelSelectors optional array of {@link LabelSelector} objects used to filter pods based on labels. + * If no selectors are provided, no Pods are selected. + */ + protected void logPodImages(String namespaceName, LabelSelector... labelSelectors) { + Arrays.stream(labelSelectors) + .parallel() + .map(selector -> kubeClient().listPods(namespaceName, selector)) + .flatMap(Collection::stream) + .forEach(pod -> + pod.getSpec().getContainers().forEach(container -> + LOGGER.info("Pod: {}/{} has image {}", + pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage()) + )); + } + + protected void waitForKafkaControllersAndBrokersFinishRollingUpdate(String componentsNamespaceName) { + LOGGER.info("Waiting for Kafka rolling update to finish"); + controllerPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, controllerSelector, 3, controllerPods); + brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, brokerSelector, 3, brokerPods); + } + + protected void applyKafkaCustomResourceFromPath(String kafkaFilePath, String kafkaVersionFromCR, String kafkaMetadataVersion) { + // Change kafka version of it's empty (null is for remove the version) + String metadataVersion = kafkaVersionFromCR == null ? null : kafkaMetadataVersion; + + kafkaYaml = new File(kafkaFilePath); + LOGGER.info("Deploying Kafka from: {}", kafkaYaml.getPath()); + cmdKubeClient().applyContent(KafkaUtils.changeOrRemoveKafkaConfigurationInKRaft(kafkaYaml, kafkaVersionFromCR, metadataVersion)); + } + + protected void applyCustomResourcesFromPath(String namespaceName, String examplesPath, String kafkaFilePath, String kafkaVersionFromCR, String kafkaMetadataVersion) { + applyKafkaCustomResourceFromPath(kafkaFilePath, kafkaVersionFromCR, kafkaMetadataVersion); + + kafkaUserYaml = new File(examplesPath + "/examples/user/kafka-user.yaml"); + LOGGER.info("Deploying KafkaUser from: {}, in Namespace: {}", kafkaUserYaml.getPath(), namespaceName); + cmdKubeClient(namespaceName).applyContent(KafkaUserUtils.removeKafkaUserPart(kafkaUserYaml, "authorization")); + + kafkaTopicYaml = new File(examplesPath + "/examples/topic/kafka-topic.yaml"); + LOGGER.info("Deploying KafkaTopic from: {}, in Namespace {}", kafkaTopicYaml.getPath(), namespaceName); + cmdKubeClient(namespaceName).applyContent(ReadWriteUtils.readFile(kafkaTopicYaml)); + } + + private String getKafkaYamlWithName(String name) { + String initialName = "name: my-topic"; + String newName = "name: %s".formatted(name); + + return ReadWriteUtils.readFile(kafkaTopicYaml).replace(initialName, newName); + } + + protected void verifyProcedure(String componentsNamespaceNames, BundleVersionModificationData upgradeData, String producerName, String consumerName, boolean wasUTOUsedBefore) { + + if (upgradeData.getAdditionalTopics() != null) { + boolean isUTOUsed = StUtils.isUnidirectionalTopicOperatorUsed(componentsNamespaceNames, eoSelector); + + // Check that topics weren't deleted/duplicated during upgrade procedures + String listedTopics = cmdKubeClient(componentsNamespaceNames).getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL)); + int additionalTopics = upgradeData.getAdditionalTopics(); + assertThat("KafkaTopic list doesn't have expected size", Long.valueOf(listedTopics.lines().count() - 1).intValue(), greaterThanOrEqualTo(getExpectedTopicCount(isUTOUsed, wasUTOUsedBefore) + additionalTopics)); + assertThat("KafkaTopic " + TOPIC_NAME + " is not in expected Topic list", + listedTopics.contains(TOPIC_NAME), is(true)); + for (int x = 0; x < UPGRADE_TOPIC_COUNT; x++) { + assertThat("KafkaTopic " + TOPIC_NAME + "-" + x + " is not in expected Topic list", listedTopics.contains(TOPIC_NAME + "-" + x), is(true)); + } + } + + if (upgradeData.getContinuousClientsMessages() != 0) { + // ############################## + // Validate that continuous clients finished successfully + // ############################## + ClientUtils.waitForClientsSuccess(componentsNamespaceNames, consumerName, producerName, upgradeData.getContinuousClientsMessages()); + // ############################## + } + } + + /** + * Based on {@param isUTOUsed} and {@param wasUTOUsedBefore} it returns the expected count of KafkaTopics. + * In case that UTO was used before and after, the expected number of KafkaTopics is {@link #UPGRADE_TOPIC_COUNT}. + * In other cases - BTO was used before or after the upgrade/downgrade - the expected number of KafkaTopics is {@link #UPGRADE_TOPIC_COUNT} + * with {@link #BTO_KAFKA_TOPICS_ONLY_COUNT}. + * @param isUTOUsed boolean value determining if UTO is used after upgrade/downgrade of the CO + * @param wasUTOUsedBefore boolean value determining if UTO was used before upgrade/downgrade of the CO + * @return expected number of KafkaTopics + */ + protected int getExpectedTopicCount(boolean isUTOUsed, boolean wasUTOUsedBefore) { + if (isUTOUsed && wasUTOUsedBefore) { + // topics that are just present in Kafka itself are not created as CRs in UTO, thus -3 topics in comparison to regular upgrade + return UPGRADE_TOPIC_COUNT; + } + + return UPGRADE_TOPIC_COUNT + BTO_KAFKA_TOPICS_ONLY_COUNT; + } + + protected String getResourceApiVersion(String resourcePlural) { + return resourcePlural + "." + Constants.V1BETA2 + "." + Constants.RESOURCE_GROUP_NAME; } protected String downloadExamplesAndGetPath(CommonVersionModificationData versionModificationData) throws IOException { @@ -729,6 +695,25 @@ protected String downloadExamplesAndGetPath(CommonVersionModificationData versio } } + /** + * Sets up the namespaces required for the file-based Strimzi upgrade test. + * This method creates and prepares the necessary namespaces if operator is installed from example files + */ + protected void setUpStrimziUpgradeTestNamespaces() { + NamespaceManager.getInstance().createNamespaceAndPrepare(CO_NAMESPACE); + NamespaceManager.getInstance().createNamespaceAndPrepare(TEST_SUITE_NAMESPACE); + } + + /** + * Cleans resources installed to namespaces from example files and namespaces themselves. + */ + protected void cleanUpStrimziUpgradeTestNamespaces() { + cleanUpKafkaTopics(TEST_SUITE_NAMESPACE); + deleteInstalledYamls(CO_NAMESPACE, TEST_SUITE_NAMESPACE, coDir); + NamespaceManager.getInstance().deleteNamespaceWithWait(CO_NAMESPACE); + NamespaceManager.getInstance().deleteNamespaceWithWait(TEST_SUITE_NAMESPACE); + } + protected void cleanUpKafkaTopics(String componentsNamespaceName) { if (CrdUtils.isCrdPresent(KafkaTopic.RESOURCE_PLURAL, KafkaTopic.RESOURCE_GROUP)) { List<KafkaTopic> topics = KafkaTopicResource.kafkaTopicClient().inNamespace(componentsNamespaceName).list().getItems(); @@ -741,28 +726,46 @@ protected void cleanUpKafkaTopics(String componentsNamespaceName) { // delete all topics created in test cmdKubeClient(componentsNamespaceName).deleteAllByResource(KafkaTopic.RESOURCE_KIND); - KafkaTopicUtils.waitForTopicWithPrefixDeletion(componentsNamespaceName, topicName); + KafkaTopicUtils.waitForTopicWithPrefixDeletion(componentsNamespaceName, TOPIC_NAME); } else { LOGGER.info("Kafka Topic CustomResource Definition does not exist, no KafkaTopic is being deleted"); } } - /** - * Sets up the namespaces required for the file-based Strimzi upgrade test. - * This method creates and prepares the necessary namespaces if operator is installed from example files - */ - protected void setUpStrimziUpgradeTestNamespaces() { - NamespaceManager.getInstance().createNamespaceAndPrepare(CO_NAMESPACE); - NamespaceManager.getInstance().createNamespaceAndPrepare(TEST_SUITE_NAMESPACE); - } + protected void deleteInstalledYamls(String clusterOperatorNamespaceName, String componentsNamespaceName, File root) { + if (kafkaUserYaml != null) { + LOGGER.info("Deleting KafkaUser configuration files"); + cmdKubeClient(componentsNamespaceName).delete(kafkaUserYaml); + } + if (kafkaTopicYaml != null) { + LOGGER.info("Deleting KafkaTopic configuration files"); + KafkaTopicUtils.setFinalizersInAllTopicsToNull(componentsNamespaceName); + cmdKubeClient().delete(kafkaTopicYaml); + } + if (kafkaYaml != null) { + LOGGER.info("Deleting Kafka configuration files"); + cmdKubeClient(componentsNamespaceName).delete(kafkaYaml); + } + if (root != null) { + final List<String> watchedNsRoleBindingFilePrefixes = List.of( + "020-RoleBinding", // rb to role for creating KNative resources + "023-RoleBinding", // rb to role for watching Strimzi CustomResources + "031-RoleBinding" // rb to role for entity operator + ); - /** - * Cleans resources installed to namespaces from example files and namespaces themselves. - */ - protected void cleanUpStrimziUpgradeTestNamespaces() { - cleanUpKafkaTopics(TEST_SUITE_NAMESPACE); - deleteInstalledYamls(CO_NAMESPACE, TEST_SUITE_NAMESPACE, coDir); - NamespaceManager.getInstance().deleteNamespaceWithWait(CO_NAMESPACE); - NamespaceManager.getInstance().deleteNamespaceWithWait(TEST_SUITE_NAMESPACE); + Arrays.stream(Objects.requireNonNull(root.listFiles())).sorted().forEach(f -> { + try { + if (watchedNsRoleBindingFilePrefixes.stream().anyMatch((rbFilePrefix) -> f.getName().startsWith(rbFilePrefix))) { + cmdKubeClient(componentsNamespaceName).deleteContent(StUtils.changeRoleBindingSubject(f, clusterOperatorNamespaceName)); + } else if (f.getName().matches(".*RoleBinding.*")) { + cmdKubeClient(clusterOperatorNamespaceName).deleteContent(StUtils.changeRoleBindingSubject(f, clusterOperatorNamespaceName)); + } else { + cmdKubeClient(clusterOperatorNamespaceName).delete(f); + } + } catch (Exception ex) { + LOGGER.warn("Failed to delete resources: {}", f.getName()); + } + }); + } } } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftKafkaUpgradeDowngradeST.java similarity index 91% rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftKafkaUpgradeDowngradeST.java index 2fbd0a081a6..77a5053d096 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftKafkaUpgradeDowngradeST.java @@ -2,7 +2,7 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.systemtest.upgrade.kraft; +package io.strimzi.systemtest.upgrade; import io.strimzi.api.kafka.Crds; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; @@ -131,10 +131,10 @@ void runVersionChange(TestStorage testStorage, TestKafkaVersion initialVersion, boolean sameMinorVersion = initialVersion.metadataVersion().equals(newVersion.metadataVersion()); - if (KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(clusterName).get() == null) { + if (KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(CLUSTER_NAME).get() == null) { LOGGER.info("Deploying initial Kafka version {} with metadataVersion={}", initialVersion.version(), initMetadataVersion); - KafkaBuilder kafka = KafkaTemplates.kafkaPersistent(testStorage.getNamespaceName(), clusterName, controllerReplicas, brokerReplicas) + KafkaBuilder kafka = KafkaTemplates.kafkaPersistent(testStorage.getNamespaceName(), CLUSTER_NAME, controllerReplicas, brokerReplicas) .editMetadata() .addToAnnotations(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled") .addToAnnotations(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled") @@ -157,8 +157,8 @@ void runVersionChange(TestStorage testStorage, TestKafkaVersion initialVersion, } resourceManager.createResourceWithWait( - KafkaNodePoolTemplates.controllerPoolPersistentStorage(testStorage.getNamespaceName(), CONTROLLER_NODE_NAME, clusterName, controllerReplicas).build(), - KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), BROKER_NODE_NAME, clusterName, brokerReplicas).build(), + KafkaNodePoolTemplates.controllerPoolPersistentStorage(testStorage.getNamespaceName(), CONTROLLER_NODE_NAME, CLUSTER_NAME, controllerReplicas).build(), + KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), BROKER_NODE_NAME, CLUSTER_NAME, brokerReplicas).build(), kafka.build() ); @@ -166,13 +166,13 @@ void runVersionChange(TestStorage testStorage, TestKafkaVersion initialVersion, // Attach clients which will continuously produce/consume messages to/from Kafka brokers during rolling update // ############################## // Setup topic, which has 3 replicas and 2 min.isr to see if producer will be able to work during rolling update - resourceManager.createResourceWithWait(KafkaTopicTemplates.topic(testStorage.getNamespaceName(), testStorage.getContinuousTopicName(), clusterName, 3, 3, 2).build()); + resourceManager.createResourceWithWait(KafkaTopicTemplates.topic(testStorage.getNamespaceName(), testStorage.getContinuousTopicName(), CLUSTER_NAME, 3, 3, 2).build()); // 40s is used within TF environment to make upgrade/downgrade more stable on slow env String producerAdditionConfiguration = "delivery.timeout.ms=300000\nrequest.timeout.ms=20000"; KafkaClients kafkaBasicClientJob = ClientUtils.getContinuousPlainClientBuilder(testStorage) .withNamespaceName(testStorage.getNamespaceName()) - .withBootstrapAddress(KafkaResources.plainBootstrapAddress(clusterName)) + .withBootstrapAddress(KafkaResources.plainBootstrapAddress(CLUSTER_NAME)) .withMessageCount(continuousClientsMessageCount) .withAdditionalConfig(producerAdditionConfiguration) .build(); @@ -184,7 +184,7 @@ void runVersionChange(TestStorage testStorage, TestKafkaVersion initialVersion, LOGGER.info("Deployment of initial Kafka version (" + initialVersion.version() + ") complete"); - String controllerVersionResult = KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(clusterName).get().getStatus().getKafkaVersion(); + String controllerVersionResult = KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(CLUSTER_NAME).get().getStatus().getKafkaVersion(); LOGGER.info("Pre-change Kafka version: " + controllerVersionResult); controllerPods = PodUtils.podSnapshot(testStorage.getNamespaceName(), controllerSelector); @@ -193,7 +193,7 @@ void runVersionChange(TestStorage testStorage, TestKafkaVersion initialVersion, LOGGER.info("Updating Kafka CR version field to " + newVersion.version()); // Change the version in Kafka CR - KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getNamespaceName(), clusterName, kafka -> { + KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getNamespaceName(), CLUSTER_NAME, kafka -> { kafka.getSpec().getKafka().setVersion(newVersion.version()); }); @@ -207,14 +207,14 @@ void runVersionChange(TestStorage testStorage, TestKafkaVersion initialVersion, brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), brokerSelector, brokerReplicas, brokerPods); LOGGER.info("1st Brokers roll (image change) is complete"); - String currentMetadataVersion = KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(clusterName).get().getSpec().getKafka().getMetadataVersion(); + String currentMetadataVersion = KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(CLUSTER_NAME).get().getSpec().getKafka().getMetadataVersion(); LOGGER.info("Deployment of Kafka (" + newVersion.version() + ") complete"); - PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName); + PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), CLUSTER_NAME); - String controllerPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(clusterName, CONTROLLER_NODE_NAME)).get(0).getMetadata().getName(); - String brokerPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(clusterName, BROKER_NODE_NAME)).get(0).getMetadata().getName(); + String controllerPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(CLUSTER_NAME, CONTROLLER_NODE_NAME)).get(0).getMetadata().getName(); + String brokerPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(CLUSTER_NAME, BROKER_NODE_NAME)).get(0).getMetadata().getName(); // Extract the Kafka version number from the jars in the lib directory controllerVersionResult = KafkaUtils.getVersionFromKafkaPodLibs(testStorage.getNamespaceName(), controllerPodName); @@ -233,7 +233,7 @@ void runVersionChange(TestStorage testStorage, TestKafkaVersion initialVersion, if (isUpgrade && !sameMinorVersion) { LOGGER.info("Updating Kafka config attribute 'metadataVersion' from '{}' to '{}' version", initialVersion.metadataVersion(), newVersion.metadataVersion()); - KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getNamespaceName(), clusterName, kafka -> { + KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getNamespaceName(), CLUSTER_NAME, kafka -> { LOGGER.info("Kafka config before updating '{}'", kafka.getSpec().getKafka().toString()); kafka.getSpec().getKafka().setMetadataVersion(newVersion.metadataVersion()); @@ -242,24 +242,24 @@ void runVersionChange(TestStorage testStorage, TestKafkaVersion initialVersion, }); LOGGER.info("Metadata version changed, it doesn't require rolling update, so the Pods should be stable"); - PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName); + PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), CLUSTER_NAME); assertFalse(RollingUpdateUtils.componentHasRolled(testStorage.getNamespaceName(), controllerSelector, controllerPods)); assertFalse(RollingUpdateUtils.componentHasRolled(testStorage.getNamespaceName(), brokerSelector, brokerPods)); } if (!isUpgrade) { LOGGER.info("Verifying that metadataVersion attribute updated correctly to version {}", initMetadataVersion); - assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(testStorage.getNamespaceName()).withName(clusterName) + assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(testStorage.getNamespaceName()).withName(CLUSTER_NAME) .get().getStatus().getKafkaMetadataVersion().contains(initMetadataVersion), is(true)); } else { if (currentMetadataVersion != null) { LOGGER.info("Verifying that metadataVersion attribute updated correctly to version {}", newVersion.metadataVersion()); - assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(testStorage.getNamespaceName()).withName(clusterName) + assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(testStorage.getNamespaceName()).withName(CLUSTER_NAME) .get().getStatus().getKafkaMetadataVersion().contains(newVersion.metadataVersion()), is(true)); } } - LOGGER.info("Waiting till Kafka Cluster {}/{} with specified version {} has the same version in status and specification", testStorage.getNamespaceName(), clusterName, newVersion.version()); - KafkaUtils.waitUntilStatusKafkaVersionMatchesExpectedVersion(testStorage.getNamespaceName(), clusterName, newVersion.version()); + LOGGER.info("Waiting till Kafka Cluster {}/{} with specified version {} has the same version in status and specification", testStorage.getNamespaceName(), CLUSTER_NAME, newVersion.version()); + KafkaUtils.waitUntilStatusKafkaVersionMatchesExpectedVersion(testStorage.getNamespaceName(), CLUSTER_NAME, newVersion.version()); } } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/OlmUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftOlmUpgradeST.java similarity index 91% rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/OlmUpgradeST.java rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftOlmUpgradeST.java index 39a031a0eda..f94a76b6338 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/OlmUpgradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftOlmUpgradeST.java @@ -2,7 +2,7 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.systemtest.upgrade.regular; +package io.strimzi.systemtest.upgrade; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; import io.strimzi.api.kafka.model.kafka.KafkaResources; @@ -16,9 +16,6 @@ import io.strimzi.systemtest.resources.operator.configuration.OlmConfiguration; import io.strimzi.systemtest.resources.operator.configuration.OlmConfigurationBuilder; import io.strimzi.systemtest.storage.TestStorage; -import io.strimzi.systemtest.upgrade.AbstractUpgradeST; -import io.strimzi.systemtest.upgrade.OlmVersionModificationData; -import io.strimzi.systemtest.upgrade.VersionModificationDataLoader; import io.strimzi.systemtest.upgrade.VersionModificationDataLoader.ModificationType; import io.strimzi.systemtest.utils.ClientUtils; import io.strimzi.systemtest.utils.FileUtils; @@ -28,7 +25,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import java.io.File; @@ -46,14 +42,13 @@ /** * This test class contains tests for Strimzi downgrade from version X to version X - 1. - * The difference between this class and {@link StrimziUpgradeST} is in cluster operator install type. + * The difference between this class and {@link KRaftStrimziUpgradeST} is in cluster operator install type. * Tests in this class use OLM for install cluster operator. */ @Tag(OLM_UPGRADE) -@Disabled // ZooKeeper is being removed -public class OlmUpgradeST extends AbstractUpgradeST { +public class KRaftOlmUpgradeST extends AbstractKRaftUpgradeST { - private static final Logger LOGGER = LogManager.getLogger(OlmUpgradeST.class); + private static final Logger LOGGER = LogManager.getLogger(KRaftOlmUpgradeST.class); private final OlmVersionModificationData olmUpgradeData = new VersionModificationDataLoader(ModificationType.OLM_UPGRADE).getOlmUpgradeData(); @IsolatedTest @@ -75,7 +70,7 @@ void testStrimziUpgrade() throws IOException { // In this test we intend to setup Kafka once at the beginning and then upgrade it with CO File dir = FileUtils.downloadAndUnzip(olmUpgradeData.getFromUrl()); - File kafkaYaml = new File(dir, olmUpgradeData.getFromExamples() + "/examples/kafka/kafka-persistent.yaml"); + File kafkaYaml = new File(dir, olmUpgradeData.getFromExamples() + olmUpgradeData.getKafkaFilePathBefore()); LOGGER.info("Deploying Kafka in Namespace: {} from file: {}", CO_NAMESPACE, kafkaYaml.getPath()); KubeClusterResource.cmdKubeClient(CO_NAMESPACE).create(kafkaYaml); @@ -104,7 +99,7 @@ void testStrimziUpgrade() throws IOException { .withProducerName(testStorage.getProducerName()) .withConsumerName(testStorage.getConsumerName()) .withNamespaceName(CO_NAMESPACE) - .withBootstrapAddress(KafkaResources.plainBootstrapAddress(clusterName)) + .withBootstrapAddress(KafkaResources.plainBootstrapAddress(CLUSTER_NAME)) .withTopicName(topicUpgradeName) .withMessageCount(testStorage.getMessageCount()) .withDelayMs(1000) @@ -136,7 +131,7 @@ void testStrimziUpgrade() throws IOException { // Wait for Rolling Update to finish controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(CO_NAMESPACE, controllerSelector, 3, controllerPods); brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(CO_NAMESPACE, brokerSelector, 3, brokerPods); - eoPods = DeploymentUtils.waitTillDepHasRolled(CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods); + eoPods = DeploymentUtils.waitTillDepHasRolled(CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME), 1, eoPods); // ======== Cluster Operator upgrade ends ======== // ======== Kafka upgrade starts ======== diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftStrimziDowngradeST.java similarity index 94% rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftStrimziDowngradeST.java index 3bc0b02ee3c..44ab69b9d89 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftStrimziDowngradeST.java @@ -2,14 +2,12 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.systemtest.upgrade.kraft; +package io.strimzi.systemtest.upgrade; import io.strimzi.systemtest.annotations.KindIPv6NotSupported; import io.strimzi.systemtest.annotations.MicroShiftNotSupported; import io.strimzi.systemtest.resources.ResourceManager; import io.strimzi.systemtest.storage.TestStorage; -import io.strimzi.systemtest.upgrade.BundleVersionModificationData; -import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; import io.strimzi.systemtest.utils.StUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftStrimziUpgradeST.java similarity index 94% rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftStrimziUpgradeST.java index d8f98050cb7..8f271d6b9f0 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KRaftStrimziUpgradeST.java @@ -2,7 +2,7 @@ * Copyright Strimzi authors. * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ -package io.strimzi.systemtest.upgrade.kraft; +package io.strimzi.systemtest.upgrade; import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.systemtest.annotations.IsolatedTest; @@ -11,9 +11,6 @@ import io.strimzi.systemtest.resources.ResourceManager; import io.strimzi.systemtest.resources.crd.KafkaResource; import io.strimzi.systemtest.storage.TestStorage; -import io.strimzi.systemtest.upgrade.BundleVersionModificationData; -import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; -import io.strimzi.systemtest.upgrade.VersionModificationDataLoader; import io.strimzi.systemtest.utils.RollingUpdateUtils; import io.strimzi.systemtest.utils.StUtils; import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils; @@ -93,16 +90,16 @@ void testUpgradeKafkaWithoutVersion() throws IOException { RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), controllerSelector, 3, controllerSnapshot); RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), brokerSelector, 3, brokerSnapshot); - DeploymentUtils.waitTillDepHasRolled(testStorage.getNamespaceName(), KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoSnapshot); + DeploymentUtils.waitTillDepHasRolled(testStorage.getNamespaceName(), KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME), 1, eoSnapshot); checkAllComponentsImages(testStorage.getNamespaceName(), acrossUpgradeData); // Verify that Pods are stable - PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName); + PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), CLUSTER_NAME); // Verify upgrade verifyProcedure(testStorage.getNamespaceName(), acrossUpgradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore); - String controllerPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(clusterName, CONTROLLER_NODE_NAME)).get(0).getMetadata().getName(); - String brokerPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(clusterName, BROKER_NODE_NAME)).get(0).getMetadata().getName(); + String controllerPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(CLUSTER_NAME, CONTROLLER_NODE_NAME)).get(0).getMetadata().getName(); + String brokerPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(CLUSTER_NAME, BROKER_NODE_NAME)).get(0).getMetadata().getName(); assertThat(KafkaUtils.getVersionFromKafkaPodLibs(testStorage.getNamespaceName(), controllerPodName), containsString(acrossUpgradeData.getProcedures().getVersion())); assertThat(KafkaUtils.getVersionFromKafkaPodLibs(testStorage.getNamespaceName(), brokerPodName), containsString(acrossUpgradeData.getProcedures().getVersion())); @@ -137,7 +134,7 @@ void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion() throws IOException { checkAllComponentsImages(testStorage.getNamespaceName(), acrossUpgradeData); // Verify that Pods are stable - PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName); + PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), CLUSTER_NAME); // Verify upgrade verifyProcedure(testStorage.getNamespaceName(), acrossUpgradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore); @@ -159,7 +156,7 @@ void testUpgradeAcrossVersionsWithNoKafkaVersion() throws IOException { // Wait till first upgrade finished controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), controllerSelector, 3, controllerPods); brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), brokerSelector, 3, brokerPods); - eoPods = DeploymentUtils.waitTillDepHasRolled(testStorage.getNamespaceName(), KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods); + eoPods = DeploymentUtils.waitTillDepHasRolled(testStorage.getNamespaceName(), KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME), 1, eoPods); LOGGER.info("Rolling to new images has finished!"); logPodImages(CO_NAMESPACE, coSelector); @@ -170,7 +167,7 @@ void testUpgradeAcrossVersionsWithNoKafkaVersion() throws IOException { checkAllComponentsImages(testStorage.getNamespaceName(), acrossUpgradeData); // Verify that Pods are stable - PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName); + PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), CLUSTER_NAME); // Verify upgrade verifyProcedure(testStorage.getNamespaceName(), acrossUpgradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java deleted file mode 100644 index 934733dcd8f..00000000000 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.systemtest.upgrade.kraft; - -import io.fabric8.kubernetes.api.model.LabelSelector; -import io.strimzi.api.kafka.model.kafka.Kafka; -import io.strimzi.api.kafka.model.kafka.KafkaResources; -import io.strimzi.api.kafka.model.nodepool.ProcessRoles; -import io.strimzi.operator.common.Annotations; -import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource; -import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates; -import io.strimzi.systemtest.templates.crd.KafkaTemplates; -import io.strimzi.systemtest.upgrade.AbstractUpgradeST; -import io.strimzi.systemtest.upgrade.BundleVersionModificationData; -import io.strimzi.systemtest.upgrade.CommonVersionModificationData; -import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; -import io.strimzi.systemtest.utils.RollingUpdateUtils; -import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils; -import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils; -import io.strimzi.systemtest.utils.kubeUtils.controllers.DeploymentUtils; -import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils; -import io.strimzi.test.ReadWriteUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.File; -import java.io.IOException; -import java.util.Map; - -import static io.strimzi.test.k8s.KubeClusterResource.cmdKubeClient; -import static org.junit.jupiter.api.Assertions.fail; - -public class AbstractKRaftUpgradeST extends AbstractUpgradeST { - - private static final Logger LOGGER = LogManager.getLogger(AbstractKRaftUpgradeST.class); - - protected Map<String, String> brokerPods; - protected Map<String, String> controllerPods; - - protected static final String CONTROLLER_NODE_NAME = "controller"; - protected static final String BROKER_NODE_NAME = "broker"; - - protected final LabelSelector controllerSelector = KafkaNodePoolResource.getLabelSelector(clusterName, CONTROLLER_NODE_NAME, ProcessRoles.CONTROLLER); - protected final LabelSelector brokerSelector = KafkaNodePoolResource.getLabelSelector(clusterName, BROKER_NODE_NAME, ProcessRoles.BROKER); - - @Override - protected void makeComponentsSnapshots(String componentsNamespaceName) { - eoPods = DeploymentUtils.depSnapshot(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(clusterName)); - controllerPods = PodUtils.podSnapshot(componentsNamespaceName, controllerSelector); - brokerPods = PodUtils.podSnapshot(componentsNamespaceName, brokerSelector); - connectPods = PodUtils.podSnapshot(componentsNamespaceName, connectLabelSelector); - } - - @Override - protected void deployKafkaClusterWithWaitForReadiness(final String componentsNamespaceName, - final BundleVersionModificationData upgradeData, - final UpgradeKafkaVersion upgradeKafkaVersion) { - LOGGER.info("Deploying Kafka: {}/{}", componentsNamespaceName, clusterName); - - if (!cmdKubeClient(componentsNamespaceName).getResources(getResourceApiVersion(Kafka.RESOURCE_PLURAL)).contains(clusterName)) { - // Deploy a Kafka cluster - if (upgradeData.getFromExamples().equals("HEAD")) { - resourceManager.createResourceWithWait( - KafkaNodePoolTemplates.controllerPoolPersistentStorage(componentsNamespaceName, CONTROLLER_NODE_NAME, clusterName, 3).build(), - KafkaNodePoolTemplates.brokerPoolPersistentStorage(componentsNamespaceName, BROKER_NODE_NAME, clusterName, 3).build(), - KafkaTemplates.kafkaPersistentKRaft(componentsNamespaceName, clusterName, 3) - .editMetadata() - .addToAnnotations(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled") - .addToAnnotations(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled") - .endMetadata() - .editSpec() - .editKafka() - .withVersion(upgradeKafkaVersion.getVersion()) - .withMetadataVersion(upgradeKafkaVersion.getMetadataVersion()) - .endKafka() - .endSpec() - .build()); - } else { - kafkaYaml = new File(dir, upgradeData.getFromExamples() + upgradeData.getKafkaKRaftFilePathBefore()); - LOGGER.info("Deploying Kafka from: {}", kafkaYaml.getPath()); - // Change kafka version of it's empty (null is for remove the version) - if (upgradeKafkaVersion == null) { - cmdKubeClient(componentsNamespaceName).applyContent(KafkaUtils.changeOrRemoveKafkaInKRaft(kafkaYaml, null)); - } else { - cmdKubeClient(componentsNamespaceName).applyContent(KafkaUtils.changeOrRemoveKafkaConfigurationInKRaft(kafkaYaml, upgradeKafkaVersion.getVersion(), upgradeKafkaVersion.getMetadataVersion())); - } - // Wait for readiness - waitForReadinessOfKafkaCluster(componentsNamespaceName); - } - } - } - - @Override - protected void waitForKafkaClusterRollingUpdate(final String componentsNamespaceName) { - LOGGER.info("Waiting for Kafka Pods with controller role to be rolled"); - controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(componentsNamespaceName, controllerSelector, 3, controllerPods); - LOGGER.info("Waiting for Kafka Pods with broker role to be rolled"); - brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(componentsNamespaceName, brokerSelector, 3, brokerPods); - LOGGER.info("Waiting for EO Deployment to be rolled"); - // Check the TO and UO also got upgraded - eoPods = DeploymentUtils.waitTillDepHasRolled(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods); - } - - @Override - protected void waitForReadinessOfKafkaCluster(final String componentsNamespaceName) { - LOGGER.info("Waiting for Kafka Pods with controller role to be ready"); - RollingUpdateUtils.waitForComponentAndPodsReady(componentsNamespaceName, controllerSelector, 3); - LOGGER.info("Waiting for Kafka Pods with broker role to be ready"); - RollingUpdateUtils.waitForComponentAndPodsReady(componentsNamespaceName, brokerSelector, 3); - LOGGER.info("Waiting for EO Deployment"); - DeploymentUtils.waitForDeploymentAndPodsReady(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(clusterName), 1); - } - - protected void changeKafkaVersion(final String componentsNamespaceName, CommonVersionModificationData versionModificationData) throws IOException { - changeKafkaVersion(componentsNamespaceName, versionModificationData, false); - } - - /** - * Method for changing Kafka `version` and `metadataVersion` fields in Kafka CR based on the current scenario - * @param versionModificationData data structure holding information about the desired steps/versions that should be applied - * @param replaceEvenIfMissing current workaround for the situation when `metadataVersion` is not set in Kafka CR -> that's because previous version of operator - * doesn't contain this kind of field, so even if we set this field in the Kafka CR, it is removed by the operator - * this is needed for correct functionality of the `testUpgradeAcrossVersionsWithUnsupportedKafkaVersion` test - * @throws IOException exception during application of YAML files - */ - @SuppressWarnings("CyclomaticComplexity") - protected void changeKafkaVersion(final String componentsNamespaceName, CommonVersionModificationData versionModificationData, boolean replaceEvenIfMissing) throws IOException { - // Get Kafka version - String kafkaVersionFromCR = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.version"); - kafkaVersionFromCR = kafkaVersionFromCR.equals("") ? null : kafkaVersionFromCR; - // Get Kafka metadata version - String currentMetadataVersion = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.metadataVersion"); - - String kafkaVersionFromProcedure = versionModificationData.getProcedures().getVersion(); - - // ####################################################################### - // ################# Update CRs to latest version ################### - // ####################################################################### - String examplesPath = downloadExamplesAndGetPath(versionModificationData); - String kafkaFilePath = examplesPath + versionModificationData.getKafkaKRaftFilePathAfter(); - - applyCustomResourcesFromPath(componentsNamespaceName, examplesPath, kafkaFilePath, kafkaVersionFromCR, currentMetadataVersion); - - // ####################################################################### - - if (versionModificationData.getProcedures() != null && (!currentMetadataVersion.isEmpty() || replaceEvenIfMissing)) { - - if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure)) { - LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure); - cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure); - - waitForKafkaControllersAndBrokersFinishRollingUpdate(componentsNamespaceName); - } - - String metadataVersion = versionModificationData.getProcedures().getMetadataVersion(); - - if (metadataVersion != null && !metadataVersion.isEmpty()) { - LOGGER.info("Set metadata version to {} (current version is {})", metadataVersion, currentMetadataVersion); - cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/metadataVersion", metadataVersion); - - makeComponentsSnapshots(componentsNamespaceName); - } - } - } - - @Override - protected void checkAllComponentsImages(String componentsNamespaceName, BundleVersionModificationData versionModificationData) { - if (versionModificationData.getImagesAfterOperations().isEmpty()) { - fail("There are no expected images"); - } - - checkContainerImages(componentsNamespaceName, controllerSelector, versionModificationData.getKafkaImage()); - checkContainerImages(componentsNamespaceName, brokerSelector, versionModificationData.getKafkaImage()); - checkContainerImages(componentsNamespaceName, eoSelector, versionModificationData.getTopicOperatorImage()); - checkContainerImages(componentsNamespaceName, eoSelector, 1, versionModificationData.getUserOperatorImage()); - } - - protected void logComponentsPodImagesWithConnect(String componentsNamespaceName) { - logPodImages(componentsNamespaceName, controllerSelector, brokerSelector, eoSelector, connectLabelSelector); - } - - protected void logComponentsPodImages(String componentsNamespaceName) { - logPodImages(componentsNamespaceName, controllerSelector, brokerSelector, eoSelector); - } - - protected void logClusterOperatorPodImage(String clusterOperatorNamespaceName) { - logPodImages(clusterOperatorNamespaceName, coSelector); - } - - protected void waitForKafkaControllersAndBrokersFinishRollingUpdate(String componentsNamespaceName) { - LOGGER.info("Waiting for Kafka rolling update to finish"); - controllerPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, controllerSelector, 3, controllerPods); - brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, brokerSelector, 3, brokerPods); - } - - protected void applyKafkaCustomResourceFromPath(String kafkaFilePath, String kafkaVersionFromCR, String kafkaMetadataVersion) { - // Change kafka version of it's empty (null is for remove the version) - String metadataVersion = kafkaVersionFromCR == null ? null : kafkaMetadataVersion; - - kafkaYaml = new File(kafkaFilePath); - LOGGER.info("Deploying Kafka from: {}", kafkaYaml.getPath()); - cmdKubeClient().applyContent(KafkaUtils.changeOrRemoveKafkaConfigurationInKRaft(kafkaYaml, kafkaVersionFromCR, metadataVersion)); - } - - protected void applyCustomResourcesFromPath(String namespaceName, String examplesPath, String kafkaFilePath, String kafkaVersionFromCR, String kafkaMetadataVersion) { - applyKafkaCustomResourceFromPath(kafkaFilePath, kafkaVersionFromCR, kafkaMetadataVersion); - - kafkaUserYaml = new File(examplesPath + "/examples/user/kafka-user.yaml"); - LOGGER.info("Deploying KafkaUser from: {}, in Namespace: {}", kafkaUserYaml.getPath(), namespaceName); - cmdKubeClient(namespaceName).applyContent(KafkaUserUtils.removeKafkaUserPart(kafkaUserYaml, "authorization")); - - kafkaTopicYaml = new File(examplesPath + "/examples/topic/kafka-topic.yaml"); - LOGGER.info("Deploying KafkaTopic from: {}, in Namespace {}", kafkaTopicYaml.getPath(), namespaceName); - cmdKubeClient(namespaceName).applyContent(ReadWriteUtils.readFile(kafkaTopicYaml)); - } -} diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java deleted file mode 100644 index d6643380dfa..00000000000 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.systemtest.upgrade.regular; - -import io.strimzi.api.kafka.Crds; -import io.strimzi.api.kafka.model.kafka.KafkaBuilder; -import io.strimzi.api.kafka.model.kafka.KafkaResources; -import io.strimzi.systemtest.Environment; -import io.strimzi.systemtest.TestConstants; -import io.strimzi.systemtest.annotations.IsolatedTest; -import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients; -import io.strimzi.systemtest.resources.ResourceManager; -import io.strimzi.systemtest.resources.crd.KafkaResource; -import io.strimzi.systemtest.storage.TestStorage; -import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates; -import io.strimzi.systemtest.templates.crd.KafkaTemplates; -import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates; -import io.strimzi.systemtest.upgrade.AbstractUpgradeST; -import io.strimzi.systemtest.utils.ClientUtils; -import io.strimzi.systemtest.utils.RollingUpdateUtils; -import io.strimzi.systemtest.utils.TestKafkaVersion; -import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils; -import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Tag; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static io.strimzi.systemtest.Environment.TEST_SUITE_NAMESPACE; -import static io.strimzi.systemtest.TestConstants.CO_NAMESPACE; -import static io.strimzi.systemtest.TestTags.UPGRADE; -import static io.strimzi.test.k8s.KubeClusterResource.cmdKubeClient; -import static io.strimzi.test.k8s.KubeClusterResource.kubeClient; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; - -/** - * This test class contains tests for Kafka upgrade/downgrade from version X to X +/- 1. - * Metadata for upgrade/downgrade procedure are loaded from kafka-versions.yaml in root dir of this repository. - */ -@Tag(UPGRADE) -@Disabled // ZooKeeper is being removed -public class KafkaUpgradeDowngradeST extends AbstractUpgradeST { - - private static final Logger LOGGER = LogManager.getLogger(KafkaUpgradeDowngradeST.class); - private final int continuousClientsMessageCount = 300; - - @IsolatedTest - void testKafkaClusterUpgrade() { - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - List<TestKafkaVersion> sortedVersions = TestKafkaVersion.getSupportedKafkaVersions(); - - for (int x = 0; x < sortedVersions.size() - 1; x++) { - TestKafkaVersion initialVersion = sortedVersions.get(x); - TestKafkaVersion newVersion = sortedVersions.get(x + 1); - - // If it is an upgrade test we keep the message format as the lower version number - String logMsgFormat = initialVersion.messageVersion(); - String interBrokerProtocol = initialVersion.protocolVersion(); - runVersionChange(testStorage, initialVersion, newVersion, logMsgFormat, interBrokerProtocol, 3, 3); - } - - // ############################## - // Validate that continuous clients finished successfully - // ############################## - ClientUtils.waitForClientsSuccess(testStorage.getNamespaceName(), testStorage.getContinuousConsumerName(), testStorage.getContinuousProducerName(), continuousClientsMessageCount); - // ############################## - } - - @IsolatedTest - void testKafkaClusterDowngrade() { - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - List<TestKafkaVersion> sortedVersions = TestKafkaVersion.getSupportedKafkaVersions(); - - for (int x = sortedVersions.size() - 1; x > 0; x--) { - TestKafkaVersion initialVersion = sortedVersions.get(x); - TestKafkaVersion newVersion = sortedVersions.get(x - 1); - - // If it is a downgrade then we make sure to use the lower version number for the message format - String logMsgFormat = newVersion.messageVersion(); - String interBrokerProtocol = newVersion.protocolVersion(); - runVersionChange(testStorage, initialVersion, newVersion, logMsgFormat, interBrokerProtocol, 3, 3); - } - - // ############################## - // Validate that continuous clients finished successfully - // ############################## - ClientUtils.waitForClientsSuccess(testStorage.getNamespaceName(), testStorage.getContinuousConsumerName(), testStorage.getContinuousProducerName(), continuousClientsMessageCount); - // ############################## - } - - @IsolatedTest - void testKafkaClusterDowngradeToOlderMessageFormat() { - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - List<TestKafkaVersion> sortedVersions = TestKafkaVersion.getSupportedKafkaVersions(); - - String initLogMsgFormat = sortedVersions.get(0).messageVersion(); - String initInterBrokerProtocol = sortedVersions.get(0).protocolVersion(); - - for (int x = sortedVersions.size() - 1; x > 0; x--) { - TestKafkaVersion initialVersion = sortedVersions.get(x); - TestKafkaVersion newVersion = sortedVersions.get(x - 1); - - runVersionChange(testStorage, initialVersion, newVersion, initLogMsgFormat, initInterBrokerProtocol, 3, 3); - } - - // ############################## - // Validate that continuous clients finished successfully - // ############################## - ClientUtils.waitForClientsSuccess(testStorage.getNamespaceName(), testStorage.getContinuousConsumerName(), testStorage.getContinuousProducerName(), continuousClientsMessageCount); - // ############################## - } - - @IsolatedTest - void testUpgradeWithNoMessageAndProtocolVersionsSet() { - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - List<TestKafkaVersion> sortedVersions = TestKafkaVersion.getSupportedKafkaVersions(); - - for (int x = 0; x < sortedVersions.size() - 1; x++) { - TestKafkaVersion initialVersion = sortedVersions.get(x); - TestKafkaVersion newVersion = sortedVersions.get(x + 1); - - runVersionChange(testStorage, initialVersion, newVersion, null, null, 3, 3); - } - - // ############################## - // Validate that continuous clients finished successfully - // ############################## - ClientUtils.waitForClientsSuccess(testStorage.getNamespaceName(), testStorage.getContinuousProducerName(), testStorage.getContinuousProducerName(), continuousClientsMessageCount); - // ############################## - } - - @IsolatedTest - void testUpgradeWithoutLogMessageFormatVersionSet() { - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - List<TestKafkaVersion> sortedVersions = TestKafkaVersion.getSupportedKafkaVersions(); - - for (int x = 0; x < sortedVersions.size() - 1; x++) { - TestKafkaVersion initialVersion = sortedVersions.get(x); - TestKafkaVersion newVersion = sortedVersions.get(x + 1); - - // If it is an upgrade test we keep the message format as the lower version number - String interBrokerProtocol = initialVersion.protocolVersion(); - runVersionChange(testStorage, initialVersion, newVersion, null, interBrokerProtocol, 3, 3); - } - - // ############################## - // Validate that continuous clients finished successfully - // ############################## - ClientUtils.waitForClientsSuccess(testStorage.getNamespaceName(), testStorage.getContinuousConsumerName(), testStorage.getContinuousProducerName(), continuousClientsMessageCount); - // ############################## - } - - @BeforeAll - void setupEnvironment() { - clusterOperator - .defaultInstallation() - .withNamespace(CO_NAMESPACE) - .withWatchingNamespaces(TEST_SUITE_NAMESPACE) - // necessary as each isolated test removes TEST_SUITE_NAMESPACE and this suite handles creation of new one on its own. - .withBindingsNamespaces(Arrays.asList(TestConstants.CO_NAMESPACE, Environment.TEST_SUITE_NAMESPACE)) - .createInstallation() - .runInstallation(); - } - - - @SuppressWarnings({"checkstyle:MethodLength"}) - void runVersionChange(TestStorage testStorage, TestKafkaVersion initialVersion, TestKafkaVersion newVersion, String initLogMsgFormat, String initInterBrokerProtocol, int kafkaReplicas, int zkReplicas) { - boolean isUpgrade = initialVersion.isUpgrade(newVersion); - Map<String, String> brokerPods; - - boolean sameMinorVersion = initialVersion.protocolVersion().equals(newVersion.protocolVersion()); - - if (KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(clusterName).get() == null) { - LOGGER.info("Deploying initial Kafka version {} with logMessageFormat={} and interBrokerProtocol={}", initialVersion.version(), initLogMsgFormat, initInterBrokerProtocol); - KafkaBuilder kafka = KafkaTemplates.kafkaPersistentNodePools(testStorage.getNamespaceName(), clusterName, kafkaReplicas, zkReplicas) - .editSpec() - .editKafka() - .withVersion(initialVersion.version()) - .withConfig(null) - .endKafka() - .endSpec(); - - // Do not set log.message.format.version if it's not passed to method - if (initLogMsgFormat != null) { - kafka - .editSpec() - .editKafka() - .addToConfig("log.message.format.version", initLogMsgFormat) - .endKafka() - .endSpec(); - } - // Do not set inter.broker.protocol.version if it's not passed to method - if (initInterBrokerProtocol != null) { - kafka - .editSpec() - .editKafka() - .addToConfig("inter.broker.protocol.version", initInterBrokerProtocol) - .endKafka() - .endSpec(); - } - resourceManager.createResourceWithWait(KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), poolName, clusterName, kafkaReplicas).build()); - resourceManager.createResourceWithWait(kafka.build()); - - // ############################## - // Attach clients which will continuously produce/consume messages to/from Kafka brokers during rolling update - // ############################## - // Setup topic, which has 3 replicas and 2 min.isr to see if producer will be able to work during rolling update - resourceManager.createResourceWithWait(KafkaTopicTemplates.topic(testStorage.getNamespaceName(), testStorage.getContinuousTopicName(), clusterName, 3, 3, 2).build()); - String producerAdditionConfiguration = "delivery.timeout.ms=300000\nrequest.timeout.ms=20000"; - - KafkaClients kafkaBasicClientJob = ClientUtils.getContinuousPlainClientBuilder(testStorage) - .withBootstrapAddress(KafkaResources.plainBootstrapAddress(clusterName)) - .withNamespaceName(testStorage.getNamespaceName()) - .withMessageCount(continuousClientsMessageCount) - .withAdditionalConfig(producerAdditionConfiguration) - .build(); - - resourceManager.createResourceWithWait(kafkaBasicClientJob.producerStrimzi()); - resourceManager.createResourceWithWait(kafkaBasicClientJob.consumerStrimzi()); - // ############################## - - } else { - LOGGER.info("Initial Kafka version (" + initialVersion.version() + ") is already ready"); - brokerPods = PodUtils.podSnapshot(testStorage.getNamespaceName(), brokerSelector); - - // Wait for log.message.format.version and inter.broker.protocol.version change - if (!sameMinorVersion - && !isUpgrade - && !ResourceManager.getTestContext().getDisplayName().contains("DowngradeToOlderMessageFormat")) { - - // In case that init config was set, which means that CR was updated and CO won't do any changes - KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getNamespaceName(), clusterName, kafka -> { - LOGGER.info("Kafka config before updating '{}'", kafka.getSpec().getKafka().getConfig().toString()); - Map<String, Object> config = kafka.getSpec().getKafka().getConfig(); - config.put("log.message.format.version", newVersion.messageVersion()); - config.put("inter.broker.protocol.version", newVersion.protocolVersion()); - kafka.getSpec().getKafka().setConfig(config); - LOGGER.info("Kafka config after updating '{}'", kafka.getSpec().getKafka().getConfig().toString()); - }); - - RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), brokerSelector, kafkaReplicas, brokerPods); - } - } - - LOGGER.info("Deployment of initial Kafka version (" + initialVersion.version() + ") complete"); - - String zkVersionCommand = "ls libs | grep -Po 'zookeeper-\\K\\d+.\\d+.\\d+' | head -1"; - String zkResult = cmdKubeClient(testStorage.getNamespaceName()).execInPodContainer(KafkaResources.zookeeperPodName(clusterName, 0), - "zookeeper", "/bin/bash", "-c", zkVersionCommand).out().trim(); - LOGGER.info("Pre-change ZooKeeper version query returned: " + zkResult); - - String kafkaVersionResult = KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(clusterName).get().getStatus().getKafkaVersion(); - LOGGER.info("Pre-change Kafka version: " + kafkaVersionResult); - - Map<String, String> controllerPods = PodUtils.podSnapshot(testStorage.getNamespaceName(), controllerSelector); - brokerPods = PodUtils.podSnapshot(testStorage.getNamespaceName(), brokerSelector); - LOGGER.info("Updating Kafka CR version field to " + newVersion.version()); - - // Change the version in Kafka CR - KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getNamespaceName(), clusterName, kafka -> { - kafka.getSpec().getKafka().setVersion(newVersion.version()); - }); - - LOGGER.info("Waiting for readiness of new Kafka version (" + newVersion.version() + ") to complete"); - - // Wait for the zk version change roll - RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), controllerSelector, zkReplicas, controllerPods); - LOGGER.info("1st ZooKeeper roll (image change) is complete"); - - // Wait for the kafka broker version change roll - brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), brokerSelector, brokerPods); - LOGGER.info("1st Kafka roll (image change) is complete"); - - Object currentLogMessageFormat = KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(clusterName).get().getSpec().getKafka().getConfig().get("log.message.format.version"); - Object currentInterBrokerProtocol = KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(clusterName).get().getSpec().getKafka().getConfig().get("inter.broker.protocol.version"); - - if (isUpgrade && !sameMinorVersion) { - LOGGER.info("Kafka version is increased, two RUs remaining for increasing IBPV and LMFV"); - - if (currentInterBrokerProtocol == null) { - brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), brokerSelector, brokerPods); - LOGGER.info("Kafka roll (inter.broker.protocol.version) is complete"); - } - - // Only Kafka versions before 3.0.0 require the second roll - if (currentLogMessageFormat == null && TestKafkaVersion.compareDottedVersions(newVersion.protocolVersion(), "3.0") < 0) { - brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), brokerSelector, kafkaReplicas, brokerPods); - LOGGER.info("Kafka roll (log.message.format.version) is complete"); - } - } - - LOGGER.info("Deployment of Kafka (" + newVersion.version() + ") complete"); - - PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), KafkaResources.kafkaComponentName(clusterName)); - - // Extract the zookeeper version number from the jars in the lib directory - zkResult = cmdKubeClient(testStorage.getNamespaceName()).execInPodContainer(KafkaResources.zookeeperPodName(clusterName, 0), - "zookeeper", "/bin/bash", "-c", zkVersionCommand).out().trim(); - LOGGER.info("Post-change ZooKeeper version query returned: " + zkResult); - - assertThat("ZooKeeper container had version " + zkResult + " where " + newVersion.zookeeperVersion() + - " was expected", zkResult, is(newVersion.zookeeperVersion())); - - // Extract the Kafka version number from the jars in the lib directory - String brokerPodName = kubeClient().listPods(testStorage.getNamespaceName(), brokerSelector).get(0).getMetadata().getName(); - kafkaVersionResult = KafkaUtils.getVersionFromKafkaPodLibs(testStorage.getNamespaceName(), brokerPodName); - LOGGER.info("Post-change Kafka version query returned: " + kafkaVersionResult); - - assertThat("Kafka container had version " + kafkaVersionResult + " where " + newVersion.version() + - " was expected", kafkaVersionResult, is(newVersion.version())); - - if (isUpgrade && !sameMinorVersion) { - LOGGER.info("Updating Kafka config attribute 'log.message.format.version' from '{}' to '{}' version", initialVersion.messageVersion(), newVersion.messageVersion()); - LOGGER.info("Updating Kafka config attribute 'inter.broker.protocol.version' from '{}' to '{}' version", initialVersion.protocolVersion(), newVersion.protocolVersion()); - - KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getNamespaceName(), clusterName, kafka -> { - LOGGER.info("Kafka config before updating '{}'", kafka.getSpec().getKafka().getConfig().toString()); - Map<String, Object> config = kafka.getSpec().getKafka().getConfig(); - config.put("log.message.format.version", newVersion.messageVersion()); - config.put("inter.broker.protocol.version", newVersion.protocolVersion()); - kafka.getSpec().getKafka().setConfig(config); - LOGGER.info("Kafka config after updating '{}'", kafka.getSpec().getKafka().getConfig().toString()); - }); - - if (currentLogMessageFormat != null || currentInterBrokerProtocol != null) { - LOGGER.info("Change of configuration is done manually - rolling update"); - // Wait for the kafka broker version of log.message.format.version change roll - RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), brokerSelector, kafkaReplicas, brokerPods); - LOGGER.info("Kafka roll (log.message.format.version change) is complete"); - } else { - LOGGER.info("Cluster Operator already changed the configuration, there should be no rolling update"); - PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), KafkaResources.kafkaComponentName(clusterName)); - assertFalse(RollingUpdateUtils.componentHasRolled(testStorage.getNamespaceName(), brokerSelector, brokerPods)); - } - } - - if (!isUpgrade) { - LOGGER.info("Verifying that log.message.format attribute updated correctly to version {}", initLogMsgFormat); - assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(testStorage.getNamespaceName()).withName(clusterName) - .get().getSpec().getKafka().getConfig().get("log.message.format.version"), is(initLogMsgFormat)); - LOGGER.info("Verifying that inter.broker.protocol.version attribute updated correctly to version {}", initInterBrokerProtocol); - assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(testStorage.getNamespaceName()).withName(clusterName) - .get().getSpec().getKafka().getConfig().get("inter.broker.protocol.version"), is(initInterBrokerProtocol)); - } else { - if (currentLogMessageFormat != null && currentInterBrokerProtocol != null) { - LOGGER.info("Verifying that log.message.format attribute updated correctly to version {}", newVersion.messageVersion()); - assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(testStorage.getNamespaceName()).withName(clusterName) - .get().getSpec().getKafka().getConfig().get("log.message.format.version"), is(newVersion.messageVersion())); - LOGGER.info("Verifying that inter.broker.protocol.version attribute updated correctly to version {}", newVersion.protocolVersion()); - assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(testStorage.getNamespaceName()).withName(clusterName) - .get().getSpec().getKafka().getConfig().get("inter.broker.protocol.version"), is(newVersion.protocolVersion())); - } - } - - LOGGER.info("Waiting till Kafka Cluster {}/{} with specified version {} has the same version in status and specification", testStorage.getNamespaceName(), clusterName, newVersion.version()); - KafkaUtils.waitUntilStatusKafkaVersionMatchesExpectedVersion(testStorage.getNamespaceName(), clusterName, newVersion.version()); - } -} diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java deleted file mode 100644 index 0d28be7b2a7..00000000000 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.systemtest.upgrade.regular; - -import io.strimzi.systemtest.annotations.KindIPv6NotSupported; -import io.strimzi.systemtest.annotations.MicroShiftNotSupported; -import io.strimzi.systemtest.resources.ResourceManager; -import io.strimzi.systemtest.storage.TestStorage; -import io.strimzi.systemtest.upgrade.AbstractUpgradeST; -import io.strimzi.systemtest.upgrade.BundleVersionModificationData; -import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; -import io.strimzi.systemtest.utils.StUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; - -import java.io.IOException; - -import static io.strimzi.systemtest.TestConstants.CO_NAMESPACE; -import static io.strimzi.systemtest.TestTags.UPGRADE; -import static org.junit.jupiter.api.Assumptions.assumeTrue; - -/** - * This test class contains tests for Strimzi downgrade from version X to version X - 1. - * Metadata for downgrade procedure are available in resource file StrimziDowngrade.json - * Kafka upgrade is done as part of those tests as well, but the tests for Kafka upgrade/downgrade are in {@link KafkaUpgradeDowngradeST}. - */ -@Tag(UPGRADE) -@Disabled // ZooKeeper is being removed -public class StrimziDowngradeST extends AbstractUpgradeST { - - private static final Logger LOGGER = LogManager.getLogger(StrimziDowngradeST.class); - - @MicroShiftNotSupported("Due to lack of Kafka Connect build feature") - @KindIPv6NotSupported("Our current CI setup doesn't allow pushing into internal registries that is needed in this test") - @ParameterizedTest(name = "from: {0} (using FG <{2}>) to: {1} (using FG <{3}>)") - @MethodSource("io.strimzi.systemtest.upgrade.VersionModificationDataLoader#loadYamlDowngradeData") - void testDowngradeOfKafkaKafkaConnectAndKafkaConnector(String from, String to, String fgBefore, String fgAfter, BundleVersionModificationData downgradeData) throws IOException { - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - UpgradeKafkaVersion downgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(downgradeData.getFromKafkaVersionsUrl(), downgradeData.getDeployKafkaVersion()); - - assumeTrue(StUtils.isAllowOnCurrentEnvironment(downgradeData.getEnvFlakyVariable())); - assumeTrue(StUtils.isAllowedOnCurrentK8sVersion(downgradeData.getEnvMaxK8sVersion())); - - LOGGER.debug("Running downgrade test from version {} to {} (FG: {} -> {})", from, to, fgBefore, fgAfter); - - doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(CO_NAMESPACE, testStorage, downgradeData, downgradeKafkaVersion); - } - - @BeforeEach - void setupEnvironment() { - setUpStrimziUpgradeTestNamespaces(); - } - - @AfterEach - void afterEach() { - cleanUpStrimziUpgradeTestNamespaces(); - } -} diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java deleted file mode 100644 index 41171b38dac..00000000000 --- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.systemtest.upgrade.regular; - -import io.strimzi.api.kafka.model.kafka.KafkaResources; -import io.strimzi.systemtest.annotations.IsolatedTest; -import io.strimzi.systemtest.annotations.KindIPv6NotSupported; -import io.strimzi.systemtest.annotations.MicroShiftNotSupported; -import io.strimzi.systemtest.resources.ResourceManager; -import io.strimzi.systemtest.storage.TestStorage; -import io.strimzi.systemtest.upgrade.AbstractUpgradeST; -import io.strimzi.systemtest.upgrade.BundleVersionModificationData; -import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion; -import io.strimzi.systemtest.upgrade.VersionModificationDataLoader; -import io.strimzi.systemtest.upgrade.VersionModificationDataLoader.ModificationType; -import io.strimzi.systemtest.utils.RollingUpdateUtils; -import io.strimzi.systemtest.utils.StUtils; -import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils; -import io.strimzi.systemtest.utils.kubeUtils.controllers.DeploymentUtils; -import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; - -import java.io.IOException; -import java.util.Map; -import java.util.Optional; - -import static io.strimzi.systemtest.Environment.TEST_SUITE_NAMESPACE; -import static io.strimzi.systemtest.TestConstants.CO_NAMESPACE; -import static io.strimzi.systemtest.TestTags.UPGRADE; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assumptions.assumeTrue; - -/** - * This test class contains tests for Strimzi upgrade from version X to version X + 1. - * Metadata for upgrade procedure are available in resource file StrimziUpgrade.json - * Kafka upgrade is done as part of those tests as well, but the tests for Kafka upgrade/downgrade are in {@link KafkaUpgradeDowngradeST}. - */ -@Tag(UPGRADE) -@Disabled // ZooKeeper is being removed -public class StrimziUpgradeST extends AbstractUpgradeST { - - private static final Logger LOGGER = LogManager.getLogger(StrimziUpgradeST.class); - private final BundleVersionModificationData acrossUpgradeData = new VersionModificationDataLoader(ModificationType.BUNDLE_UPGRADE).buildDataForUpgradeAcrossVersions(); - - @MicroShiftNotSupported("Due to lack of Kafka Connect build feature") - @KindIPv6NotSupported("Our current CI setup doesn't allow pushing into internal registries that is needed in this test") - @ParameterizedTest(name = "from: {0} (using FG <{2}>) to: {1} (using FG <{3}>)") - @MethodSource("io.strimzi.systemtest.upgrade.VersionModificationDataLoader#loadYamlUpgradeData") - void testUpgradeOfKafkaKafkaConnectAndKafkaConnector(String fromVersion, String toVersion, String fgBefore, String fgAfter, BundleVersionModificationData upgradeData) throws IOException { - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(upgradeData.getOldestKafka()); - // setting log message version to null, similarly to the examples, which are not configuring LMFV - upgradeKafkaVersion.setLogMessageVersion(null); - - assumeTrue(StUtils.isAllowOnCurrentEnvironment(upgradeData.getEnvFlakyVariable())); - assumeTrue(StUtils.isAllowedOnCurrentK8sVersion(upgradeData.getEnvMaxK8sVersion())); - - LOGGER.debug("Running upgrade test from version {} to {} (FG: {} -> {})", - fromVersion, toVersion, fgBefore, fgAfter); - doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(CO_NAMESPACE, testStorage, upgradeData, upgradeKafkaVersion); - } - - @IsolatedTest - void testUpgradeKafkaWithoutVersion() throws IOException { - UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion()); - upgradeKafkaVersion.setVersion(null); - - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - - // Setup env - setupEnvAndUpgradeClusterOperator(CO_NAMESPACE, testStorage, acrossUpgradeData, upgradeKafkaVersion); - - final Map<String, String> zooSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), controllerSelector); - final Map<String, String> kafkaSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), brokerSelector); - final Map<String, String> eoSnapshot = DeploymentUtils.depSnapshot(testStorage.getNamespaceName(), KafkaResources.entityOperatorDeploymentName(clusterName)); - - // Make snapshots of all Pods - makeComponentsSnapshots(testStorage.getNamespaceName()); - - // Check if UTO is used before changing the CO -> used for check for KafkaTopics - boolean wasUTOUsedBefore = StUtils.isUnidirectionalTopicOperatorUsed(testStorage.getNamespaceName(), eoSelector); - - // Upgrade CO - changeClusterOperator(CO_NAMESPACE, testStorage.getNamespaceName(), acrossUpgradeData); - - logPodImages(CO_NAMESPACE, coSelector); - - RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), controllerSelector, 3, zooSnapshot); - RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), brokerSelector, 3, kafkaSnapshot); - DeploymentUtils.waitTillDepHasRolled(testStorage.getNamespaceName(), KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoSnapshot); - - logPodImages(CO_NAMESPACE, coSelector); - checkAllComponentsImages(testStorage.getNamespaceName(), acrossUpgradeData); - - // Verify that Pods are stable - PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName); - // Verify upgrade - verifyProcedure(testStorage.getNamespaceName(), acrossUpgradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore); - assertThat(KafkaUtils.getVersionFromKafkaPodLibs(testStorage.getNamespaceName(), KafkaResources.kafkaPodName(clusterName, 0)), containsString(acrossUpgradeData.getProcedures().getVersion())); - } - - @IsolatedTest - void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion() throws IOException { - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - Optional<UpgradeKafkaVersion> upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaVersionSupportedBeforeUnsupportedAfterUpgrade(acrossUpgradeData.getFromKafkaVersionsUrl()); - assumeTrue(upgradeKafkaVersion.isPresent(), "Supported Kafka versions after upgrade contains all supported Kafka versions before upgrade so test is skipped"); - - // Setup env - setupEnvAndUpgradeClusterOperator(CO_NAMESPACE, testStorage, acrossUpgradeData, upgradeKafkaVersion.get()); - - // Make snapshots of all Pods - makeComponentsSnapshots(TEST_SUITE_NAMESPACE); - - // Check if UTO is used before changing the CO -> used for check for KafkaTopics - boolean wasUTOUsedBefore = StUtils.isUnidirectionalTopicOperatorUsed(TEST_SUITE_NAMESPACE, eoSelector); - - // Upgrade CO - changeClusterOperator(CO_NAMESPACE, TEST_SUITE_NAMESPACE, acrossUpgradeData); - logPodImages(CO_NAMESPACE, coSelector); - // Upgrade kafka - changeKafkaVersion(TEST_SUITE_NAMESPACE, acrossUpgradeData); - logPodImages(TEST_SUITE_NAMESPACE, coSelector); - checkAllComponentsImages(TEST_SUITE_NAMESPACE, acrossUpgradeData); - // Verify that Pods are stable - PodUtils.verifyThatRunningPodsAreStable(TEST_SUITE_NAMESPACE, clusterName); - // Verify upgrade - verifyProcedure(TEST_SUITE_NAMESPACE, acrossUpgradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore); - } - - @IsolatedTest - void testUpgradeAcrossVersionsWithNoKafkaVersion() throws IOException { - final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext()); - // Setup env - setupEnvAndUpgradeClusterOperator(CO_NAMESPACE, testStorage, acrossUpgradeData, null); - - // Check if UTO is used before changing the CO -> used for check for KafkaTopics - boolean wasUTOUsedBefore = StUtils.isUnidirectionalTopicOperatorUsed(TEST_SUITE_NAMESPACE, eoSelector); - - // Upgrade CO - changeClusterOperator(CO_NAMESPACE, TEST_SUITE_NAMESPACE, acrossUpgradeData); - // Wait till first upgrade finished - controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TEST_SUITE_NAMESPACE, controllerSelector, 3, controllerPods); - brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TEST_SUITE_NAMESPACE, brokerSelector, 3, brokerPods); - eoPods = DeploymentUtils.waitTillDepHasRolled(TEST_SUITE_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods); - - LOGGER.info("Rolling to new images has finished!"); - logPodImages(CO_NAMESPACE, coSelector); - // Upgrade kafka - changeKafkaVersion(testStorage.getNamespaceName(), acrossUpgradeData); - logPodImages(CO_NAMESPACE, coSelector); - checkAllComponentsImages(TEST_SUITE_NAMESPACE, acrossUpgradeData); - // Verify that Pods are stable - PodUtils.verifyThatRunningPodsAreStable(TEST_SUITE_NAMESPACE, clusterName); - // Verify upgrade - verifyProcedure(TEST_SUITE_NAMESPACE, acrossUpgradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore); - } - - @BeforeEach - void setupEnvironment() { - setUpStrimziUpgradeTestNamespaces(); - } - - @AfterEach - void afterEach() { - cleanUpStrimziUpgradeTestNamespaces(); - } -} diff --git a/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml b/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml index 4e8b73f417a..5c4df299774 100644 --- a/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml +++ b/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml @@ -24,10 +24,8 @@ # featureGatesBefore: String - FG added to `STRIMZI_FEATURE_GATES` environment variable, on initial deploy of CO # featureGatesAfter: String - FG added to `STRIMZI_FEATURE_GATES` environment variable, on upgrade of CO # filePaths: path to example files for particular resources -# kafkaBefore: path to Kafka resource (ZK mode) in the version of Strimzi, from which we are doing the downgrade -# kafkaAfter: path to Kafka resource (ZK mode) in the version of Strimzi, to which we are doing the downgrade -# kafkaKRaftBefore: path to Kafka and KafkaNodePool resources (KRaft mode), collected in one file, in the version of Strimzi, from which we are doing the downgrade -# kafkaKRaftAfter: path to Kafka and KafkaNodePool resources (KRaft mode), collected in one file, in the version of Strimzi, to which we are doing the downgrade +# kafkaBefore: path to Kafka and KafkaNodePool resources, collected in one file, in the version of Strimzi, from which we are doing the downgrade +# kafkaAfter: path to Kafka and KafkaNodePool resources, collected in one file, in the version of Strimzi, to which we are doing the downgrade # --- Structure --- - fromVersion: HEAD toVersion: 0.45.0 @@ -54,6 +52,4 @@ featureGatesAfter: "" filePaths: kafkaBefore: "/examples/kafka/kafka-persistent.yaml" - kafkaAfter: "/examples/kafka/kafka-persistent.yaml" - kafkaKRaftBefore: "/examples/kafka/kafka-persistent.yaml" - kafkaKRaftAfter: "/examples/kafka/kraft/kafka.yaml" \ No newline at end of file + kafkaAfter: "/examples/kafka/kraft/kafka.yaml" \ No newline at end of file diff --git a/systemtest/src/test/resources/upgrade/BundleUpgrade.yaml b/systemtest/src/test/resources/upgrade/BundleUpgrade.yaml index ee780c19689..3d21735c840 100644 --- a/systemtest/src/test/resources/upgrade/BundleUpgrade.yaml +++ b/systemtest/src/test/resources/upgrade/BundleUpgrade.yaml @@ -23,10 +23,8 @@ # featureGatesBefore: String - FG added to `STRIMZI_FEATURE_GATES` environment variable, on initial deploy of CO # featureGatesAfter: String - FG added to `STRIMZI_FEATURE_GATES` environment variable, on upgrade of CO # filePaths: path to example files for particular resources -# kafkaBefore: path to Kafka resource (ZK mode) in the version of Strimzi, from which we are doing the upgrade -# kafkaAfter: path to Kafka resource (ZK mode) in the version of Strimzi, to which we are doing the upgrade -# kafkaKRaftBefore: path to Kafka and KafkaNodePool resources (KRaft mode), collected in one file, in the version of Strimzi, from which we are doing the upgrade -# kafkaKRaftAfter: path to Kafka and KafkaNodePool resources (KRaft mode), collected in one file, in the version of Strimzi, to which we are doing the upgrade +# kafkaBefore: path to Kafka and KafkaNodePool resources, collected in one file, in the version of Strimzi, from which we are doing the upgrade +# kafkaAfter: path to Kafka and KafkaNodePool resources, collected in one file, in the version of Strimzi, to which we are doing the upgrade # --- Structure --- - fromVersion: 0.45.0 @@ -50,7 +48,5 @@ featureGatesBefore: "" featureGatesAfter: "" filePaths: - kafkaBefore: "/examples/kafka/kafka-persistent.yaml" + kafkaBefore: "/examples/kafka/kraft/kafka.yaml" kafkaAfter: "/examples/kafka/kafka-persistent.yaml" - kafkaKRaftBefore: "/examples/kafka/kraft/kafka.yaml" - kafkaKRaftAfter: "/examples/kafka/kafka-persistent.yaml" diff --git a/systemtest/src/test/resources/upgrade/OlmUpgrade.yaml b/systemtest/src/test/resources/upgrade/OlmUpgrade.yaml index 17563fde327..e41a17abdf8 100644 --- a/systemtest/src/test/resources/upgrade/OlmUpgrade.yaml +++ b/systemtest/src/test/resources/upgrade/OlmUpgrade.yaml @@ -5,10 +5,8 @@ # fromUrl: String - url, from which examples are downloaded # fromKafkaVersionsUrl: String - url, from which is kafka-versions.yaml file downloaded # filePaths: path to example files for particular resources -# kafkaBefore: path to Kafka resource (ZK mode) in the version of Strimzi, from which we are doing the upgrade -# kafkaAfter: path to Kafka resource (ZK mode) in the version of Strimzi, to which we are doing the upgrade -# kafkaKRaftBefore: path to Kafka and KafkaNodePool resources (KRaft mode), collected in one file, in the version of Strimzi, from which we are doing the upgrade -# kafkaKRaftAfter: path to Kafka and KafkaNodePool resources (KRaft mode), collected in one file, in the version of Strimzi, to which we are doing the upgrade +# kafkaBefore: path to Kafka and KafkaNodePool resources, collected in one file, in the version of Strimzi, from which we are doing the upgrade +# kafkaAfter: path to Kafka and KafkaNodePool resources, collected in one file, in the version of Strimzi, to which we are doing the upgrade # --- Structure --- # # --- Prerequisites --- @@ -24,7 +22,5 @@ fromExamples: strimzi-0.45.0 fromUrl: https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.45.0/strimzi-0.45.0.zip fromKafkaVersionsUrl: https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/0.45.0/kafka-versions.yaml filePaths: - kafkaBefore: "/examples/kafka/kafka-persistent.yaml" - kafkaAfter: "/examples/kafka/kafka-persistent.yaml" - kafkaKRaftBefore: "/examples/kafka/nodepools/kafka-with-kraft.yaml" - kafkaKRaftAfter: "/examples/kafka/nodepools/kafka-with-kraft.yaml" \ No newline at end of file + kafkaBefore: "/examples/kafka/kraft/kafka.yaml" + kafkaAfter: "/examples/kafka/kafka-persistent.yaml" \ No newline at end of file