Skip to content

Commit

Permalink
[ST] Use the KafkaConnect with Connector upgrade/downgrade scenario a…
Browse files Browse the repository at this point in the history
…s the main one (#10544)

Signed-off-by: Lukas Kral <[email protected]>
  • Loading branch information
im-konge authored Sep 6, 2024
1 parent 11628c3 commit 8f14b5e
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ public void setVersion(String version) {
this.version = version;
}

public void setLogMessageVersion(String logMessageVersion) {
this.logMessageVersion = logMessageVersion;
}

public void setMetadataVersion(String metadataVersion) {
this.metadataVersion = metadataVersion;
}

public String getVersion() {
return version;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public static Stream<Arguments> loadYamlDowngradeDataWithFeatureGates(String fea

downgradeData = updateUpgradeDataWithFeatureGates(downgradeData, featureGates);

parameters.add(Arguments.of(downgradeData.getFromVersion(), downgradeData.getToVersion(), downgradeData));
parameters.add(Arguments.of(downgradeData.getFromVersion(), downgradeData.getToVersion(), downgradeData.getFeatureGatesBefore(), downgradeData.getFeatureGatesAfter(), downgradeData));
});

return parameters.stream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected void makeComponentsSnapshots(String componentsNamespaceName) {
}

@SuppressWarnings("CyclomaticComplexity")
protected void changeKafkaAndLogFormatVersion(String componentsNamespaceName, CommonVersionModificationData versionModificationData) throws IOException {
protected void changeKafkaVersion(String componentsNamespaceName, CommonVersionModificationData versionModificationData) throws IOException {
// Get Kafka configurations
String currentLogMessageFormat = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.config.log\\.message\\.format\\.version");
String currentInterBrokerProtocol = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.config.inter\\.broker\\.protocol\\.version");
Expand Down Expand Up @@ -212,6 +212,16 @@ protected void changeKafkaAndLogFormatVersion(String componentsNamespaceName, Co
}
}

protected void changeKafkaVersionInKafkaConnect(String componentsNamespaceName, CommonVersionModificationData versionModificationData) {
UpgradeKafkaVersion upgradeToKafkaVersion = new UpgradeKafkaVersion(versionModificationData.getProcedures().getVersion());

LOGGER.info(String.format("Setting Kafka version to %s in Kafka Connect", upgradeToKafkaVersion.getVersion()));
cmdKubeClient(componentsNamespaceName).patchResource(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL), clusterName, "/spec/version", upgradeToKafkaVersion.getVersion());

LOGGER.info("Waiting for KafkaConnect rolling update to finish");
connectPods = RollingUpdateUtils.waitTillComponentHasRolled(componentsNamespaceName, connectLabelSelector, 1, connectPods);
}

protected void logComponentsPodImagesWithConnect(String componentsNamespaceName) {
logPodImages(componentsNamespaceName, controllerSelector, brokerSelector, eoSelector, connectLabelSelector);
}
Expand Down Expand Up @@ -550,7 +560,11 @@ protected void deployKafkaTopicWithWaitForReadiness(final String componentsNames
}
}

protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(final TestStorage testStorage, final BundleVersionModificationData acrossUpgradeData, final UpgradeKafkaVersion upgradeKafkaVersion) {
protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(
final TestStorage testStorage,
final BundleVersionModificationData acrossUpgradeData,
final UpgradeKafkaVersion upgradeKafkaVersion
) {
// setup KafkaConnect + KafkaConnector
if (!cmdKubeClient(testStorage.getNamespaceName()).getResources(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL)).contains(clusterName)) {
if (acrossUpgradeData.getFromVersion().equals("HEAD")) {
Expand Down Expand Up @@ -625,14 +639,17 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(final Tes
}
}

protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(final String clusterOperatorNamespaceName,
final TestStorage testStorage,
final BundleVersionModificationData bundleDowngradeDataWithFeatureGates,
final UpgradeKafkaVersion upgradeKafkaVersion) throws IOException {
this.deployCoWithWaitForReadiness(clusterOperatorNamespaceName, testStorage.getNamespaceName(), bundleDowngradeDataWithFeatureGates);
this.deployKafkaClusterWithWaitForReadiness(testStorage.getNamespaceName(), bundleDowngradeDataWithFeatureGates, upgradeKafkaVersion);
this.deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(testStorage, bundleDowngradeDataWithFeatureGates, upgradeKafkaVersion);
this.deployKafkaUserWithWaitForReadiness(testStorage.getNamespaceName(), bundleDowngradeDataWithFeatureGates);
protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(
final String clusterOperatorNamespaceName,
final TestStorage testStorage,
final BundleVersionModificationData upgradeDowngradeData,
final UpgradeKafkaVersion upgradeKafkaVersion
) throws IOException {
setupEnvAndUpgradeClusterOperator(clusterOperatorNamespaceName, testStorage, upgradeDowngradeData, upgradeKafkaVersion);
this.deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(testStorage, upgradeDowngradeData, upgradeKafkaVersion);

// Check if UTO is used before changing the CO -> used for check for KafkaTopics
boolean wasUTOUsedBefore = StUtils.isUnidirectionalTopicOperatorUsed(testStorage.getNamespaceName(), eoSelector);

final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(clusterName))
.withNamespaceName(testStorage.getNamespaceName())
Expand All @@ -651,23 +668,39 @@ protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(final
KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount());

// Upgrade CO to HEAD and wait for readiness of ClusterOperator
changeClusterOperator(clusterOperatorNamespaceName, testStorage.getNamespaceName(), bundleDowngradeDataWithFeatureGates);
changeClusterOperator(clusterOperatorNamespaceName, testStorage.getNamespaceName(), upgradeDowngradeData);

if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeKafkaVersion.getVersion())) {
// Verify that Kafka and Connect Pods Rolled
waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName());
connectPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), connectLabelSelector, 1, connectPods);
KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), clusterName);
}

// Verify that Kafka and Connect Pods Rolled
waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName());
RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), connectLabelSelector, 1, connectPods);
KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), clusterName);
logComponentsPodImagesWithConnect(testStorage.getNamespaceName());

// Upgrade/Downgrade kafka
changeKafkaVersion(testStorage.getNamespaceName(), upgradeDowngradeData);
changeKafkaVersionInKafkaConnect(testStorage.getNamespaceName(), upgradeDowngradeData);

logComponentsPodImagesWithConnect(testStorage.getNamespaceName());
checkAllComponentsImages(testStorage.getNamespaceName(), upgradeDowngradeData);

// send again new messages
resourceManager.createResourceWithWait(clients.producerTlsStrimzi(clusterName));

// Verify that Producer finish successfully
ClientUtils.waitForInstantProducerClientSuccess(testStorage.getNamespaceName(), testStorage);

// Verify FileSink KafkaConnector
connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName();
KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount());

// Verify that pods are stable
PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName);

// Verify upgrade
verifyProcedure(testStorage.getNamespaceName(), upgradeDowngradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore);
}

protected String downloadExamplesAndGetPath(CommonVersionModificationData versionModificationData) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ protected void waitForReadinessOfKafkaCluster(final String componentsNamespaceNa
DeploymentUtils.waitForDeploymentAndPodsReady(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(clusterName), 1);
}

protected void changeKafkaAndMetadataVersion(final String componentsNamespaceName, CommonVersionModificationData versionModificationData) throws IOException {
changeKafkaAndMetadataVersion(componentsNamespaceName, versionModificationData, false);
protected void changeKafkaVersion(final String componentsNamespaceName, CommonVersionModificationData versionModificationData) throws IOException {
changeKafkaVersion(componentsNamespaceName, versionModificationData, false);
}

/**
Expand All @@ -131,7 +131,7 @@ protected void changeKafkaAndMetadataVersion(final String componentsNamespaceNam
* @throws IOException exception during application of YAML files
*/
@SuppressWarnings("CyclomaticComplexity")
protected void changeKafkaAndMetadataVersion(final String componentsNamespaceName, CommonVersionModificationData versionModificationData, boolean replaceEvenIfMissing) throws IOException {
protected void changeKafkaVersion(final String componentsNamespaceName, CommonVersionModificationData versionModificationData, boolean replaceEvenIfMissing) throws IOException {
// Get Kafka version
String kafkaVersionFromCR = cmdKubeClient(componentsNamespaceName).getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.version");
kafkaVersionFromCR = kafkaVersionFromCR.equals("") ? null : kafkaVersionFromCR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.upgrade.BundleVersionModificationData;
import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion;
import io.strimzi.systemtest.upgrade.VersionModificationDataLoader;
import io.strimzi.systemtest.utils.StUtils;
import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand All @@ -38,61 +35,23 @@
@Tag(KRAFT_UPGRADE)
public class KRaftStrimziDowngradeST extends AbstractKRaftUpgradeST {
private static final Logger LOGGER = LogManager.getLogger(KRaftStrimziDowngradeST.class);
private final BundleVersionModificationData bundleDowngradeVersionData = new VersionModificationDataLoader(VersionModificationDataLoader.ModificationType.BUNDLE_DOWNGRADE).buildDataForDowngradeUsingFirstScenarioForKRaft();

@ParameterizedTest(name = "testDowngradeStrimziVersion-{0}-{1}")
@MethodSource("io.strimzi.systemtest.upgrade.VersionModificationDataLoader#loadYamlDowngradeDataForKRaft")
void testDowngradeStrimziVersion(String from, String to, BundleVersionModificationData parameters) throws Exception {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
assumeTrue(StUtils.isAllowOnCurrentEnvironment(parameters.getEnvFlakyVariable()));
assumeTrue(StUtils.isAllowedOnCurrentK8sVersion(parameters.getEnvMaxK8sVersion()));

LOGGER.debug("Running downgrade test from version {} to {}", from, to);
performDowngrade(CO_NAMESPACE, testStorage.getNamespaceName(), parameters);
}

@MicroShiftNotSupported("Due to lack of Kafka Connect build feature")
@KindIPv6NotSupported("Our current CI setup doesn't allow pushing into internal registries that is needed in this test")
@Test
void testDowngradeOfKafkaConnectAndKafkaConnector() throws IOException {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(bundleDowngradeVersionData.getDeployKafkaVersion());

doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(CO_NAMESPACE, testStorage, bundleDowngradeVersionData, upgradeKafkaVersion);
}

private void performDowngrade(String clusterOperatorNamespaceName, String componentsNamespaceName, BundleVersionModificationData downgradeData) throws IOException {
@ParameterizedTest(name = "from: {0} (using FG <{2}>) to: {1} (using FG <{3}>)")
@MethodSource("io.strimzi.systemtest.upgrade.VersionModificationDataLoader#loadYamlDowngradeDataForKRaft")
void testDowngradeOfKafkaKafkaConnectAndKafkaConnector(String from, String to, String fgBefore, String fgAfter, BundleVersionModificationData downgradeData) throws IOException {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());

String lowerMetadataVersion = downgradeData.getProcedures().getMetadataVersion();
UpgradeKafkaVersion testUpgradeKafkaVersion = new UpgradeKafkaVersion(downgradeData.getDeployKafkaVersion(), lowerMetadataVersion);

// Setup env
// We support downgrade only when you didn't upgrade to new inter.broker.protocol.version and log.message.format.version
// https://strimzi.io/docs/operators/latest/full/deploying.html#con-target-downgrade-version-str

setupEnvAndUpgradeClusterOperator(clusterOperatorNamespaceName, testStorage, downgradeData, testUpgradeKafkaVersion);
logClusterOperatorPodImage(clusterOperatorNamespaceName);

boolean wasUTOUsedBefore = StUtils.isUnidirectionalTopicOperatorUsed(componentsNamespaceName, eoSelector);

// Downgrade CO
changeClusterOperator(clusterOperatorNamespaceName, componentsNamespaceName, downgradeData);

// Wait for Kafka cluster rolling update
waitForKafkaClusterRollingUpdate(componentsNamespaceName);
logComponentsPodImages(componentsNamespaceName);

// Downgrade kafka
changeKafkaAndMetadataVersion(componentsNamespaceName, downgradeData);
UpgradeKafkaVersion downgradeKafkaVersion = new UpgradeKafkaVersion(downgradeData.getDeployKafkaVersion(), lowerMetadataVersion);

// Verify that pods are stable
PodUtils.verifyThatRunningPodsAreStable(componentsNamespaceName, clusterName);
assumeTrue(StUtils.isAllowOnCurrentEnvironment(downgradeData.getEnvFlakyVariable()));
assumeTrue(StUtils.isAllowedOnCurrentK8sVersion(downgradeData.getEnvMaxK8sVersion()));

checkAllComponentsImages(componentsNamespaceName, downgradeData);
LOGGER.debug("Running downgrade test from version {} to {} (FG: {} -> {})", from, to, fgBefore, fgAfter);

// Verify upgrade
verifyProcedure(componentsNamespaceName, downgradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName(), wasUTOUsedBefore);
doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(CO_NAMESPACE, testStorage, downgradeData, downgradeKafkaVersion);
}

@BeforeEach
Expand Down
Loading

0 comments on commit 8f14b5e

Please sign in to comment.