Skip to content

Commit

Permalink
Minor refactoring of how the Kubernetes Client is created and used in…
Browse files Browse the repository at this point in the history
… the Topic Operator (#10901)

Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored Dec 5, 2024
1 parent 4364690 commit c079854
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.strimzi.operator.topic.model.TopicEvent.TopicUpsert;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Handler for {@link KafkaTopic} events.
Expand All @@ -24,8 +23,6 @@ class TopicEventHandler implements ResourceEventHandler<KafkaTopic> {
private final TopicOperatorConfig config;
private final BatchingLoop queue;
private final MetricsHolder metrics;

private long lastPeriodicTimestampMs;

public TopicEventHandler(TopicOperatorConfig config, BatchingLoop queue, MetricsHolder metrics) {
this.config = config;
Expand All @@ -48,9 +45,8 @@ public void onAdd(KafkaTopic obj) {
@Override
public void onUpdate(KafkaTopic oldObj, KafkaTopic newObj) {
String trigger = Objects.equals(oldObj, newObj) ? "resync" : "update";
if (trigger.equals("resync") && (TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - lastPeriodicTimestampMs) > config.fullReconciliationIntervalMs()) {
if (trigger.equals("resync")) {
LOGGER.infoOp("Triggering periodic reconciliation of {} resources for namespace {}", KafkaTopic.RESOURCE_KIND, config.namespace());
this.lastPeriodicTimestampMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
if (trigger.equals("update")) {
LOGGER.debugOp("Informed about update event for topic {}", TopicOperatorUtil.topicName(newObj));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public class TopicOperatorMain implements Liveness, Readiness {
private final static ReconciliationLogger LOGGER = ReconciliationLogger.create(TopicOperatorMain.class);
private final static long INFORMER_PERIOD_MS = 2_000;
private final static long INFORMER_RESYNC_CHECK_PERIOD_MS = 30_000;

private final TopicOperatorConfig config;
private final KubernetesClient kubernetesClient;
Expand All @@ -61,7 +61,7 @@ public class TopicOperatorMain implements Liveness, Readiness {
Objects.requireNonNull(config.labelSelector());
this.config = config;
var selector = config.labelSelector().toMap();
this.kubernetesClient = TopicOperatorUtil.createKubernetesClient("main");
this.kubernetesClient = new OperatorKubernetesClientBuilder("strimzi-topic-operator", TopicOperatorMain.class.getPackage().getImplementationVersion()).build();
this.kafkaAdminClient = kafkaAdminClient;
this.cruiseControlClient = TopicOperatorUtil.createCruiseControlClient(config);

Expand Down Expand Up @@ -96,13 +96,13 @@ synchronized void start() {
// Do NOT use withLabels to filter the informer, since the controller is stateful
// (topics need to be added to removed from TopicController.topics if KafkaTopics transition between
// selected and unselected).
.runnableInformer(INFORMER_PERIOD_MS)
// The informer interval acts like a heartbeat, then each handler interval will cause a resync at
.runnableInformer(INFORMER_RESYNC_CHECK_PERIOD_MS)
// The informer resync check interval acts like a heartbeat, then each handler interval will cause a resync at
// some interval of the overall heartbeat. The closer these values are together the more likely it
// is that the handler skips one informer intervals. Setting both intervals to the same value generates
// just enough skew that when the informer checks if the handler is ready for resync it sees that
// it still needs another couple of micro-seconds and skips to the next informer level resync.
.addEventHandlerWithResyncPeriod(resourceEventHandler, config.fullReconciliationIntervalMs() + INFORMER_PERIOD_MS)
.addEventHandlerWithResyncPeriod(resourceEventHandler, config.fullReconciliationIntervalMs())
.itemStore(itemStore);
LOGGER.infoOp("Starting informer");
informer.run();
Expand Down Expand Up @@ -162,13 +162,6 @@ static TopicOperatorMain operator(TopicOperatorConfig config, Admin kafkaAdmin)
return new TopicOperatorMain(config, kafkaAdmin);
}

static KubernetesClient kubeClient() {
return new OperatorKubernetesClientBuilder(
"strimzi-topic-operator",
TopicOperatorMain.class.getPackage().getImplementationVersion())
.build();
}

@Override
public boolean isAlive() {
boolean running;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
*/
package io.strimzi.operator.topic;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.micrometer.core.instrument.Timer;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.OperatorKubernetesClientBuilder;
import io.strimzi.operator.common.model.InvalidResourceException;
import io.strimzi.operator.topic.cruisecontrol.CruiseControlClient;
import io.strimzi.operator.topic.metrics.TopicOperatorMetricsHolder;
Expand Down Expand Up @@ -39,19 +37,6 @@ public class TopicOperatorUtil {

private TopicOperatorUtil() { }

/**
* Create a new Kubernetes client instance.
*
* @param id Caller id.
* @return Kubernetes client.
*/
public static KubernetesClient createKubernetesClient(String id) {
return new OperatorKubernetesClientBuilder(
"strimzi-topic-operator-" + id,
TopicOperatorMain.class.getPackage().getImplementationVersion())
.build();
}

/**
* Create a new Kafka admin client instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.strimzi.operator.topic;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.strimzi.api.ResourceAnnotations;
import io.strimzi.api.kafka.Crds;
Expand Down Expand Up @@ -117,7 +118,7 @@ class TopicControllerIT implements TestSeparator {

@BeforeAll
public static void beforeAll() {
kubernetesClient = TopicOperatorUtil.createKubernetesClient("test");
kubernetesClient = new KubernetesClientBuilder().build();
TopicOperatorTestUtil.setupKubeCluster(kubernetesClient, NAMESPACE);
}

Expand Down

0 comments on commit c079854

Please sign in to comment.