From cb45f8d6c3473525e8d96188b49e43c15174ccfe Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pablo=20D=C3=ADaz-L=C3=B3pez?= <padilo@gmail.com>
Date: Sat, 31 Aug 2024 13:59:12 +0200
Subject: [PATCH] Fix topic operator loop for unmanaged topics (#10451)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Pablo Díaz <padilo@gmail.com>
---
 .../topic/BatchingTopicController.java        | 60 ++------------
 .../operator/topic/TopicControllerIT.java     | 83 +++++++++++++++++--
 2 files changed, 85 insertions(+), 58 deletions(-)

diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java
index 0e37ae20442..bfc33bc33e0 100644
--- a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java
+++ b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java
@@ -11,10 +11,10 @@
 import io.strimzi.api.kafka.model.common.ConditionBuilder;
 import io.strimzi.api.kafka.model.topic.KafkaTopic;
 import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder;
-import io.strimzi.api.kafka.model.topic.KafkaTopicStatus;
 import io.strimzi.api.kafka.model.topic.KafkaTopicStatusBuilder;
 import io.strimzi.operator.common.Reconciliation;
 import io.strimzi.operator.common.ReconciliationLogger;
+import io.strimzi.operator.common.model.StatusDiff;
 import io.strimzi.operator.common.model.StatusUtils;
 import io.strimzi.operator.topic.metrics.TopicOperatorMetricsHolder;
 import io.strimzi.operator.topic.model.Either;
@@ -186,7 +186,7 @@ static String resourceVersion(KafkaTopic kt) {
     private List<ReconcilableTopic> addOrRemoveFinalizer(boolean useFinalizer, List<ReconcilableTopic> reconcilableTopics) {
         List<ReconcilableTopic> collect = reconcilableTopics.stream()
                 .map(reconcilableTopic ->
-                        new ReconcilableTopic(reconcilableTopic.reconciliation(), useFinalizer 
+                        new ReconcilableTopic(reconcilableTopic.reconciliation(), useFinalizer
                             ? addFinalizer(reconcilableTopic) : removeFinalizer(reconcilableTopic), reconcilableTopic.topicName()))
                 .collect(Collectors.toList());
         LOGGER.traceOp("{} {} topics", useFinalizer ? "Added finalizers to" : "Removed finalizers from", reconcilableTopics.size());
@@ -1132,7 +1132,7 @@ private void updateStatusForSuccess(ReconcilableTopic reconcilableTopic) {
         } else if (TopicOperatorUtil.isPaused(reconcilableTopic.kt())) {
             conditionType = "ReconciliationPaused";
         }
-        
+
         conditions.add(new ConditionBuilder()
               .withType(conditionType)
               .withStatus("True")
@@ -1210,10 +1210,10 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
         var oldStatus = Crds.topicOperation(kubeClient)
             .inNamespace(reconcilableTopic.kt().getMetadata().getNamespace())
             .withName(reconcilableTopic.kt().getMetadata().getName()).get().getStatus();
-        
+
         // the observedGeneration is a marker that shows that the operator works and that it saw the last update to the resource
         reconcilableTopic.kt().getStatus().setObservedGeneration(reconcilableTopic.kt().getMetadata().getGeneration());
-        
+
         // set or reset the topicName
         reconcilableTopic.kt().getStatus().setTopicName(
             !TopicOperatorUtil.isManaged(reconcilableTopic.kt())
@@ -1222,8 +1222,9 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
                 ? oldStatus.getTopicName()
                 : TopicOperatorUtil.topicName(reconcilableTopic.kt())
         );
-        
-        if (statusChanged(reconcilableTopic.kt(), oldStatus)) {
+
+        StatusDiff statusDiff = new StatusDiff(oldStatus, reconcilableTopic.kt().getStatus());
+        if (!statusDiff.isEmpty()) {
             try {
                 var updatedTopic = new KafkaTopicBuilder(reconcilableTopic.kt())
                     .editOrNewMetadata()
@@ -1242,49 +1243,4 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
             }
         }
     }
-
-    private boolean statusChanged(KafkaTopic kt, KafkaTopicStatus oldStatus) {
-        return oldStatusOrTopicNameMissing(oldStatus)
-            || nonPausedAndDifferentGenerations(kt, oldStatus)
-            || differentConditions(kt.getStatus().getConditions(), oldStatus.getConditions())
-            || replicasChangesDiffer(kt, oldStatus);
-    }
-
-    private boolean oldStatusOrTopicNameMissing(KafkaTopicStatus oldStatus) {
-        return oldStatus == null || oldStatus.getTopicName() == null;
-    }
-
-    private boolean nonPausedAndDifferentGenerations(KafkaTopic kt, KafkaTopicStatus oldStatus) {
-        return !TopicOperatorUtil.isPaused(kt) && oldStatus.getObservedGeneration() != kt.getMetadata().getGeneration();
-    }
-    
-    private boolean differentConditions(List<Condition> newConditions, List<Condition> oldConditions) {
-        if (Objects.equals(newConditions, oldConditions)) {
-            return false;
-        } else if (newConditions == null || oldConditions == null || newConditions.size() != oldConditions.size()) {
-            return true;
-        } else {
-            for (int i = 0; i < newConditions.size(); i++) {
-                if (conditionsDiffer(newConditions.get(i), oldConditions.get(i))) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-    private boolean conditionsDiffer(Condition newCondition, Condition oldCondition) {
-        return !Objects.equals(newCondition.getType(), oldCondition.getType())
-            || !Objects.equals(newCondition.getStatus(), oldCondition.getStatus())
-            || !Objects.equals(newCondition.getReason(), oldCondition.getReason())
-            || !Objects.equals(newCondition.getMessage(), oldCondition.getMessage());
-    }
-
-    @SuppressWarnings("BooleanExpressionComplexity")
-    private boolean replicasChangesDiffer(KafkaTopic kt, KafkaTopicStatus oldStatus) {
-        return kt.getStatus().getReplicasChange() == null && oldStatus.getReplicasChange() != null
-            || kt.getStatus().getReplicasChange() != null && oldStatus.getReplicasChange() == null
-            || (kt.getStatus().getReplicasChange() != null && oldStatus.getReplicasChange() != null 
-                && !Objects.equals(kt.getStatus().getReplicasChange(), oldStatus.getReplicasChange()));
-    }
 }
diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java
index 23cee8bc9de..9d634f31a01 100644
--- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java
+++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java
@@ -244,11 +244,23 @@ private static Predicate<KafkaTopic> readyIsFalseAndReasonIs(String requiredReas
     }
 
     private static Predicate<KafkaTopic> readyIsTrueOrFalse() {
+        return typeIsTrueOrFalse("Ready");
+    }
+
+    private static Predicate<KafkaTopic> unmanagedIsTrueOrFalse() {
+        return typeIsTrueOrFalse("Unmanaged");
+    }
+
+    private static Predicate<KafkaTopic> typeIsTrueOrFalse(String type) {
         Predicate<Condition> conditionPredicate = condition ->
-            "Ready".equals(condition.getType())
-                && "True".equals(condition.getStatus())
-                ||  "False".equals(condition.getStatus());
-        return isReconcilatedAndHasConditionMatching("Ready=True or False", conditionPredicate);
+            type.equals(condition.getType())
+                    && "True".equals(condition.getStatus())
+                    || "False".equals(condition.getStatus());
+        return isReconcilatedAndHasConditionMatching(type + "=True or False", conditionPredicate);
+    }
+
+    private static Predicate<KafkaTopic> unmanagedStatusTrue() {
+        return typeIsTrueOrFalse("Unmanaged");
     }
 
     private static Predicate<KafkaTopic> unmanagedIsTrue() {
@@ -665,7 +677,7 @@ public void shouldNotCreateTopicInKafkaWhenUnmanagedTopicCreatedInKube(
         // given
 
         // when
-        var reconciled = createTopic(kafkaCluster, kt);
+        var reconciled = createTopic(kafkaCluster, kt, unmanagedStatusTrue());
 
         // then
         assertNull(reconciled.getStatus().getTopicName());
@@ -1137,7 +1149,7 @@ public void shouldRestoreFinalizerIfRemoved(
         @BrokerConfig(name = "auto.create.topics.enable", value = "false")
         KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
         // given
-        var created = createTopic(kafkaCluster, kt);
+        var created = createTopic(kafkaCluster, kt, TopicOperatorUtil.isManaged(kt) ? readyIsTrueOrFalse() : unmanagedIsTrueOrFalse());
         if (TopicOperatorUtil.isManaged(kt)) {
             assertCreateSuccess(kt, created);
         }
@@ -2153,4 +2165,63 @@ public void shouldReconcileOnTopicExistsException(
         KafkaTopic kafkaTopic = createTopic(kafkaCluster, kafkaTopic(NAMESPACE, topicName, true, topicName, 2, 1));
         assertTrue(readyIsTrue().test(kafkaTopic));
     }
+
+    @Test
+    public void shouldUpdateAnUnmanagedTopic(
+            @BrokerConfig(name = "auto.create.topics.enable", value = "false")
+            KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
+        var topicName = "my-topic";
+
+        // create the topic
+        var topic = createTopic(kafkaCluster,
+                kafkaTopic(NAMESPACE, topicName, SELECTOR, null, null, topicName, 1, 1,
+                        Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000")));
+        topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
+
+        TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
+                Optional.of(kt)
+                    .map(KafkaTopic::getStatus)
+                    .map(KafkaTopicStatus::getConditions)
+                    .flatMap(c -> Optional.of(c.get(0)))
+                    .map(Condition::getType)
+                    .filter("Ready"::equals)
+                    .isPresent()
+        );
+
+        // set unmanaged
+        topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
+        topic.setStatus(null);
+        topic.getMetadata().getAnnotations().put(TopicOperatorUtil.MANAGED, "false");
+        topic = Crds.topicOperation(kubernetesClient).resource(topic).update();
+
+        TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
+            Optional.of(kt)
+                    .map(KafkaTopic::getStatus)
+                    .map(KafkaTopicStatus::getConditions)
+                    .flatMap(c -> Optional.of(c.get(0)))
+                    .map(Condition::getType)
+                    .filter("Unmanaged"::equals)
+                    .isPresent()
+        );
+
+        // apply a change to the unmanaged topic
+        topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
+        topic.setStatus(null);
+        topic.getSpec().getConfig().put(TopicConfig.RETENTION_MS_CONFIG, "1001");
+        topic = Crds.topicOperation(kubernetesClient).resource(topic).update();
+        var resourceVersionOnUpdate = topic.getMetadata().getResourceVersion();
+
+        TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
+                !resourceVersionOnUpdate.equals(kt.getMetadata().getResourceVersion())
+        );
+        topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
+        var resourceVersionAfterUpdate = topic.getMetadata().getResourceVersion();
+
+        // Wait a bit to check the resource is not getting updated continuously
+        Thread.sleep(500L);
+        TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
+                resourceVersionAfterUpdate.equals(kt.getMetadata().getResourceVersion())
+        );
+    }
+
 }