From fa459d9d032b093c84e2009d0aa4887475d5e4bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 22 Jul 2025 14:13:05 +0200 Subject: [PATCH 01/18] fix: startup resource cache access MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../junit/AbstractOperatorExtension.java | 4 + .../junit/LocallyRunOperatorExtension.java | 22 +++++- .../StartupSecondaryAccessCustomResource.java | 13 ++++ .../StartupSecondaryAccessIT.java | 53 +++++++++++++ .../StartupSecondaryAccessReconciler.java | 75 +++++++++++++++++++ 5 files changed, 165 insertions(+), 2 deletions(-) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessCustomResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessReconciler.java diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java index 00bf7e8380..794bc11d9a 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java @@ -111,6 +111,10 @@ public T create(T resource) { return kubernetesClient.resource(resource).inNamespace(namespace).create(); } + public T serverSideApply(T resource) { + return kubernetesClient.resource(resource).inNamespace(namespace).serverSideApply(); + } + public T replace(T resource) { return kubernetesClient.resource(resource).inNamespace(namespace).replace(); } diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java index 3e6ad35e52..54cb57544d 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java @@ -54,6 +54,7 @@ public class LocallyRunOperatorExtension extends AbstractOperatorExtension { private final List> additionalCustomResourceDefinitions; private final Map registeredControllers; private final Map crdMappings; + private final Consumer beforeStartHook; private LocallyRunOperatorExtension( List reconcilers, @@ -68,7 +69,8 @@ private LocallyRunOperatorExtension( Consumer configurationServiceOverrider, Function namespaceNameSupplier, Function perClassNamespaceNameSupplier, - List additionalCrds) { + List additionalCrds, + Consumer beforeStartHook) { super( infrastructure, infrastructureTimeout, @@ -82,6 +84,7 @@ private LocallyRunOperatorExtension( this.portForwards = portForwards; this.localPortForwards = new ArrayList<>(portForwards.size()); this.additionalCustomResourceDefinitions = additionalCustomResourceDefinitions; + this.beforeStartHook = beforeStartHook; configurationServiceOverrider = configurationServiceOverrider != null ? configurationServiceOverrider.andThen( @@ -298,6 +301,10 @@ protected void before(ExtensionContext context) { }); crdMappings.clear(); + if (beforeStartHook != null) { + beforeStartHook.accept(this); + } + LOGGER.debug("Starting the operator locally"); this.operator.start(); } @@ -356,6 +363,7 @@ public static class Builder extends AbstractBuilder { private final List portForwards; private final List> additionalCustomResourceDefinitions; private final List additionalCRDs = new ArrayList<>(); + private Consumer beforeStartHook; private KubernetesClient kubernetesClient; protected Builder() { @@ -424,6 +432,15 @@ public Builder withAdditionalCRD(String... paths) { return this; } + /** + * Used to initialize resources when the namespace is generated but the operator is not started + * yet. + */ + public Builder withBeforeStartHook(Consumer beforeStartHook) { + this.beforeStartHook = beforeStartHook; + return this; + } + public LocallyRunOperatorExtension build() { return new LocallyRunOperatorExtension( reconcilers, @@ -438,7 +455,8 @@ public LocallyRunOperatorExtension build() { configurationServiceOverrider, namespaceNameSupplier, perClassNamespaceNameSupplier, - additionalCRDs); + additionalCRDs, + beforeStartHook); } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessCustomResource.java new file mode 100644 index 0000000000..b9701c94bd --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessCustomResource.java @@ -0,0 +1,13 @@ +package io.javaoperatorsdk.operator.baseapi.startsecondaryaccess; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("ssac") +public class StartupSecondaryAccessCustomResource extends CustomResource + implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java new file mode 100644 index 0000000000..eb6b4c0ca3 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java @@ -0,0 +1,53 @@ +package io.javaoperatorsdk.operator.baseapi.startsecondaryaccess; + +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static io.javaoperatorsdk.operator.baseapi.startsecondaryaccess.StartupSecondaryAccessReconciler.LABEL_KEY; +import static io.javaoperatorsdk.operator.baseapi.startsecondaryaccess.StartupSecondaryAccessReconciler.LABEL_VALUE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class StartupSecondaryAccessIT { + + public static final int SECONDARY_NUMBER = 100; + + @RegisterExtension + static LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(new StartupSecondaryAccessReconciler()) + .withBeforeStartHook( + ex -> { + var primary = new StartupSecondaryAccessCustomResource(); + primary.setMetadata(new ObjectMetaBuilder().withName("test1").build()); + primary = ex.serverSideApply(primary); + + for (int i = 0; i < SECONDARY_NUMBER; i++) { + ConfigMap cm = new ConfigMap(); + cm.setMetadata( + new ObjectMetaBuilder() + .withLabels(Map.of(LABEL_KEY, LABEL_VALUE)) + .withNamespace(ex.getNamespace()) + .withName("cm" + i) + .build()); + cm.addOwnerReference(primary); + ex.serverSideApply(cm); + } + }) + .build(); + + @Test + void reconcilerSeeAllSecondaryResources() { + var reconciler = extension.getReconcilerOfType(StartupSecondaryAccessReconciler.class); + + await().untilAsserted(() -> assertThat(reconciler.isReconciled()).isTrue()); + + assertThat(reconciler.isSecondaryAndCacheSameAmount()).isTrue(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessReconciler.java new file mode 100644 index 0000000000..a2c51fdafd --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessReconciler.java @@ -0,0 +1,75 @@ +package io.javaoperatorsdk.operator.baseapi.startsecondaryaccess; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; + +import static io.javaoperatorsdk.operator.baseapi.startsecondaryaccess.StartupSecondaryAccessIT.SECONDARY_NUMBER; + +@ControllerConfiguration +public class StartupSecondaryAccessReconciler + implements Reconciler { + + private static final Logger log = LoggerFactory.getLogger(StartupSecondaryAccessReconciler.class); + + public static final String LABEL_KEY = "app"; + public static final String LABEL_VALUE = "secondary-test"; + + private InformerEventSource cmInformer; + + private boolean secondaryAndCacheSameAmount = true; + private boolean reconciled = false; + + @Override + public UpdateControl reconcile( + StartupSecondaryAccessCustomResource resource, + Context context) { + + var secondary = context.getSecondaryResources(ConfigMap.class); + var cached = cmInformer.list().toList(); + + log.info( + "Secondary number: {}, cached: {}, expected: {}", + secondary.size(), + cached.size(), + SECONDARY_NUMBER); + + if (secondary.size() != cached.size()) { + secondaryAndCacheSameAmount = false; + } + reconciled = true; + return UpdateControl.noUpdate(); + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + cmInformer = + new InformerEventSource<>( + InformerEventSourceConfiguration.from( + ConfigMap.class, StartupSecondaryAccessCustomResource.class) + .withLabelSelector(LABEL_KEY + "=" + LABEL_VALUE) + .build(), + context); + return List.of(cmInformer); + } + + public boolean isSecondaryAndCacheSameAmount() { + return secondaryAndCacheSameAmount; + } + + public boolean isReconciled() { + return reconciled; + } +} From 12c216845876e84da999cab07ba071d74a00aee0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 22 Jul 2025 14:51:45 +0200 Subject: [PATCH 02/18] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/InformerEventSource.java | 68 +++++++++++++------ .../StartupSecondaryAccessIT.java | 2 +- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index b52dc278f2..5c4ab471c2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -58,10 +59,11 @@ public class InformerEventSource extends ManagedInformerEventSource> implements ResourceEventHandler { + public static final String PRIMARY_TO_SECONDARY_INDEX_NAME = "primaryToSecondary"; + public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous"; private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); // we need direct control for the indexer to propagate the just update resource also to the index - private final PrimaryToSecondaryIndex primaryToSecondaryIndex; private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; private final String id = UUID.randomUUID().toString(); @@ -96,11 +98,13 @@ private InformerEventSource( // If there is a primary to secondary mapper there is no need for primary to secondary index. primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); if (primaryToSecondaryMapper == null) { - primaryToSecondaryIndex = - // The index uses the secondary to primary mapper (always present) to build the index - new DefaultPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper()); - } else { - primaryToSecondaryIndex = NOOPPrimaryToSecondaryIndex.getInstance(); + addIndexers( + Map.of( + PRIMARY_TO_SECONDARY_INDEX_NAME, + (R r) -> + configuration.getSecondaryToPrimaryMapper().toPrimaryResourceIDs(r).stream() + .map(InformerEventSource::resourceIdToString) + .toList())); } final var informerConfig = configuration.getInformerConfig(); @@ -119,7 +123,7 @@ public void onAdd(R newResource) { resourceType().getSimpleName(), newResource.getMetadata().getResourceVersion()); } - primaryToSecondaryIndex.onAddOrUpdate(newResource); + onAddOrUpdate( Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource)); } @@ -134,7 +138,7 @@ public void onUpdate(R oldObject, R newObject) { newObject.getMetadata().getResourceVersion(), oldObject.getMetadata().getResourceVersion()); } - primaryToSecondaryIndex.onAddOrUpdate(newObject); + onAddOrUpdate( Operation.UPDATE, newObject, @@ -150,7 +154,7 @@ public void onDelete(R resource, boolean b) { ResourceID.fromResource(resource), resourceType().getSimpleName()); } - primaryToSecondaryIndex.onDelete(resource); + super.onDelete(resource, b); if (acceptedByDeleteFilters(resource, b)) { propagateEvent(resource); @@ -244,27 +248,42 @@ private void propagateEvent(R object) { @Override public Set getSecondaryResources(P primary) { - Set secondaryIDs; + if (useSecondaryToPrimaryIndex()) { - var primaryResourceID = ResourceID.fromResource(primary); - secondaryIDs = primaryToSecondaryIndex.getSecondaryResources(primaryResourceID); + + var resources = + byIndex( + PRIMARY_TO_SECONDARY_INDEX_NAME, + resourceIdToString(ResourceID.fromResource(primary))); + log.debug( - "Using PrimaryToSecondaryIndex to find secondary resources for primary: {}. Found" - + " secondary ids: {} ", - primaryResourceID, - secondaryIDs); + "Using informer primary to secondary index to find secondary resources for primary name:" + + " {} namespace: {}. Found {}", + primary.getMetadata().getName(), + primary.getMetadata().getNamespace(), + resources.size()); + + return resources.stream() + .map( + r -> { + Optional resource = + temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(r)); + return resource.orElse(r); + }) + .collect(Collectors.toSet()); + } else { - secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary); + Set secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary); log.debug( "Using PrimaryToSecondaryMapper to find secondary resources for primary: {}. Found" + " secondary ids: {} ", primary, secondaryIDs); + return secondaryIDs.stream() + .map(this::get) + .flatMap(Optional::stream) + .collect(Collectors.toSet()); } - return secondaryIDs.stream() - .map(this::get) - .flatMap(Optional::stream) - .collect(Collectors.toSet()); } @Override @@ -279,7 +298,8 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res } private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { - primaryToSecondaryIndex.onAddOrUpdate(newResource); + // todo + // primaryToSecondaryIndex.onAddOrUpdate(newResource); temporaryResourceCache.putResource( newResource, Optional.ofNullable(oldResource) @@ -332,4 +352,8 @@ private enum Operation { ADD, UPDATE } + + private static String resourceIdToString(ResourceID resourceID) { + return resourceID.getName() + "#" + resourceID.getNamespace().orElse("$na"); + } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java index eb6b4c0ca3..61fc40803c 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java @@ -16,7 +16,7 @@ class StartupSecondaryAccessIT { - public static final int SECONDARY_NUMBER = 100; + public static final int SECONDARY_NUMBER = 200; @RegisterExtension static LocallyRunOperatorExtension extension = From abde7f6b5165daa91dd4c973073451e76474bc67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 22 Jul 2025 17:38:02 +0200 Subject: [PATCH 03/18] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../ComplementaryPrimaryToSecondaryIndex.java | 15 ++++++++++++ ...tComplementaryPrimaryToSecondaryIndex.java | 24 +++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java new file mode 100644 index 0000000000..b8491ce281 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Set; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public interface ComplementaryPrimaryToSecondaryIndex { + + void explicitAddOrUpdate(R resource); + + void onCreateOrUpdateEvent(R resourceID); + + Set getComplementarySecondaryResources(ResourceID primary); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java new file mode 100644 index 0000000000..7c535f1681 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java @@ -0,0 +1,24 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class DefaultComplementaryPrimaryToSecondaryIndex + implements ComplementaryPrimaryToSecondaryIndex { + + private final ConcurrentHashMap> index = new ConcurrentHashMap<>(); + + @Override + public void explicitAddOrUpdate(R resource) {} + + @Override + public void onCreateOrUpdateEvent(R resourceID) {} + + @Override + public Set getComplementarySecondaryResources(ResourceID primary) { + return Set.of(); + } +} From 6f5e6e3f55e2047d1cf9bd90803f6be3c3cc0afa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Jul 2025 11:58:35 +0200 Subject: [PATCH 04/18] complementary index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../ComplementaryPrimaryToSecondaryIndex.java | 4 +- ...tComplementaryPrimaryToSecondaryIndex.java | 43 +++++++++++++++-- .../source/informer/InformerEventSource.java | 48 ++++++++++++------- ...PComplementaryPrimaryToSecondaryIndex.java | 21 ++++++++ 4 files changed, 93 insertions(+), 23 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java index b8491ce281..8ca9fdf69a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java @@ -7,9 +7,9 @@ public interface ComplementaryPrimaryToSecondaryIndex { - void explicitAddOrUpdate(R resource); + void explicitAdd(R resource); - void onCreateOrUpdateEvent(R resourceID); + void cleanupForResource(R resourceID); Set getComplementarySecondaryResources(ResourceID primary); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java index 7c535f1681..346c3302db 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java @@ -1,24 +1,61 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; public class DefaultComplementaryPrimaryToSecondaryIndex implements ComplementaryPrimaryToSecondaryIndex { private final ConcurrentHashMap> index = new ConcurrentHashMap<>(); + private final SecondaryToPrimaryMapper secondaryToPrimaryMapper; + + public DefaultComplementaryPrimaryToSecondaryIndex( + SecondaryToPrimaryMapper secondaryToPrimaryMapper) { + this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; + } @Override - public void explicitAddOrUpdate(R resource) {} + public void explicitAdd(R resource) { + Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); + primaryResources.forEach( + primaryResource -> { + var resourceSet = + index.computeIfAbsent(primaryResource, pr -> ConcurrentHashMap.newKeySet()); + resourceSet.add(ResourceID.fromResource(resource)); + }); + } @Override - public void onCreateOrUpdateEvent(R resourceID) {} + public void cleanupForResource(R resource) { + Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); + primaryResources.forEach( + primaryResource -> { + var secondaryResources = index.get(primaryResource); + // this can be null in just very special cases, like when the secondaryToPrimaryMapper is + // changing dynamically. Like if a list of ResourceIDs mapped dynamically extended in the + // mapper between the onAddOrUpdate and onDelete is called. + if (secondaryResources != null) { + secondaryResources.remove(ResourceID.fromResource(resource)); + if (secondaryResources.isEmpty()) { + index.remove(primaryResource); + } + } + }); + } @Override public Set getComplementarySecondaryResources(ResourceID primary) { - return Set.of(); + var resourceIDs = index.get(primary); + if (resourceIDs == null) { + return Collections.emptySet(); + } else { + return new HashSet<>(resourceIDs); + } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 5c4ab471c2..4535e4235e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -65,6 +65,7 @@ public class InformerEventSource private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; + private final ComplementaryPrimaryToSecondaryIndex complementaryPrimaryToSecondaryIndex; private final String id = UUID.randomUUID().toString(); public InformerEventSource( @@ -98,6 +99,9 @@ private InformerEventSource( // If there is a primary to secondary mapper there is no need for primary to secondary index. primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); if (primaryToSecondaryMapper == null) { + complementaryPrimaryToSecondaryIndex = + new DefaultComplementaryPrimaryToSecondaryIndex<>( + configuration.getSecondaryToPrimaryMapper()); addIndexers( Map.of( PRIMARY_TO_SECONDARY_INDEX_NAME, @@ -105,6 +109,8 @@ private InformerEventSource( configuration.getSecondaryToPrimaryMapper().toPrimaryResourceIDs(r).stream() .map(InformerEventSource::resourceIdToString) .toList())); + } else { + complementaryPrimaryToSecondaryIndex = new NOOPComplementaryPrimaryToSecondaryIndex(); } final var informerConfig = configuration.getInformerConfig(); @@ -123,7 +129,7 @@ public void onAdd(R newResource) { resourceType().getSimpleName(), newResource.getMetadata().getResourceVersion()); } - + complementaryPrimaryToSecondaryIndex.cleanupForResource(newResource); onAddOrUpdate( Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource)); } @@ -250,11 +256,8 @@ private void propagateEvent(R object) { public Set getSecondaryResources(P primary) { if (useSecondaryToPrimaryIndex()) { - - var resources = - byIndex( - PRIMARY_TO_SECONDARY_INDEX_NAME, - resourceIdToString(ResourceID.fromResource(primary))); + var primaryID = ResourceID.fromResource(primary); + var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID)); log.debug( "Using informer primary to secondary index to find secondary resources for primary name:" @@ -262,16 +265,24 @@ public Set getSecondaryResources(P primary) { primary.getMetadata().getName(), primary.getMetadata().getNamespace(), resources.size()); - - return resources.stream() - .map( - r -> { - Optional resource = - temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(r)); - return resource.orElse(r); - }) - .collect(Collectors.toSet()); - + var complementaryIds = + complementaryPrimaryToSecondaryIndex.getComplementarySecondaryResources(primaryID); + var res = + resources.stream() + .map( + r -> { + var resourceId = ResourceID.fromResource(r); + Optional resource = temporaryResourceCache.getResourceFromCache(resourceId); + complementaryIds.remove(resourceId); + return resource.orElse(r); + }) + .collect(Collectors.toSet()); + complementaryIds.forEach( + id -> { + Optional resource = temporaryResourceCache.getResourceFromCache(id); + resource.ifPresent(res::add); + }); + return res; } else { Set secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary); log.debug( @@ -298,8 +309,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res } private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { - // todo - // primaryToSecondaryIndex.onAddOrUpdate(newResource); + if (operation == Operation.ADD) { + complementaryPrimaryToSecondaryIndex.explicitAdd(newResource); + } temporaryResourceCache.putResource( newResource, Optional.ofNullable(oldResource) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java new file mode 100644 index 0000000000..e8318315cb --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java @@ -0,0 +1,21 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Set; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class NOOPComplementaryPrimaryToSecondaryIndex + implements ComplementaryPrimaryToSecondaryIndex { + + @Override + public void explicitAdd(R resource) {} + + @Override + public void cleanupForResource(R resourceID) {} + + @Override + public Set getComplementarySecondaryResources(ResourceID primary) { + return Set.of(); + } +} From 37fc1e0e7ceb901bb26fdbf8afd5636fe6303b2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Jul 2025 13:59:03 +0200 Subject: [PATCH 05/18] test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/InformerEventSource.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 4535e4235e..969fd9002b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -129,7 +129,6 @@ public void onAdd(R newResource) { resourceType().getSimpleName(), newResource.getMetadata().getResourceVersion()); } - complementaryPrimaryToSecondaryIndex.cleanupForResource(newResource); onAddOrUpdate( Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource)); } @@ -160,7 +159,7 @@ public void onDelete(R resource, boolean b) { ResourceID.fromResource(resource), resourceType().getSimpleName()); } - + complementaryPrimaryToSecondaryIndex.cleanupForResource(resource); super.onDelete(resource, b); if (acceptedByDeleteFilters(resource, b)) { propagateEvent(resource); @@ -170,7 +169,7 @@ public void onDelete(R resource, boolean b) { private synchronized void onAddOrUpdate( Operation operation, R newObject, R oldObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); - + complementaryPrimaryToSecondaryIndex.cleanupForResource(newObject); if (canSkipEvent(newObject, oldObject, resourceID)) { log.debug( "Skipping event propagation for {}, since was a result of a reconcile action. Resource" @@ -309,9 +308,7 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res } private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { - if (operation == Operation.ADD) { - complementaryPrimaryToSecondaryIndex.explicitAdd(newResource); - } + complementaryPrimaryToSecondaryIndex.explicitAdd(newResource); temporaryResourceCache.putResource( newResource, Optional.ofNullable(oldResource) From 240099afc1a57851fd557cf3c7696681ed2c024c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Jul 2025 14:36:11 +0200 Subject: [PATCH 06/18] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../processing/event/source/informer/InformerEventSource.java | 2 +- .../dependentresourcecrossref/DependentResourceCrossRefIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 969fd9002b..e23f39c611 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -278,7 +278,7 @@ public Set getSecondaryResources(P primary) { .collect(Collectors.toSet()); complementaryIds.forEach( id -> { - Optional resource = temporaryResourceCache.getResourceFromCache(id); + Optional resource = get(id); resource.ifPresent(res::add); }); return res; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java index ae5cd25895..b82fe27415 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java @@ -16,7 +16,7 @@ class DependentResourceCrossRefIT { public static final String TEST_RESOURCE_NAME = "test"; - public static final int EXECUTION_NUMBER = 50; + public static final int EXECUTION_NUMBER = 150; @RegisterExtension LocallyRunOperatorExtension operator = From 1e771e25add26c1c29ce5b88facc866056711ac7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Jul 2025 15:12:58 +0200 Subject: [PATCH 07/18] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../ComplementaryPrimaryToSecondaryIndex.java | 2 +- ...tComplementaryPrimaryToSecondaryIndex.java | 2 +- .../source/informer/InformerEventSource.java | 5 +- ...PComplementaryPrimaryToSecondaryIndex.java | 2 +- ...plementaryPrimaryToSecondaryIndexTest.java | 134 ++++++++++++++++++ .../DependentResourceCrossRefIT.java | 2 +- .../DependentResourceCrossRefReconciler.java | 6 + 7 files changed, 146 insertions(+), 7 deletions(-) create mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java index 8ca9fdf69a..06e1b9c560 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java @@ -11,5 +11,5 @@ public interface ComplementaryPrimaryToSecondaryIndex { void cleanupForResource(R resourceID); - Set getComplementarySecondaryResources(ResourceID primary); + Set getSecondaryResources(ResourceID primary); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java index 346c3302db..ab1fb3d0fe 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java @@ -50,7 +50,7 @@ public void cleanupForResource(R resource) { } @Override - public Set getComplementarySecondaryResources(ResourceID primary) { + public Set getSecondaryResources(ResourceID primary) { var resourceIDs = index.get(primary); if (resourceIDs == null) { return Collections.emptySet(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index e23f39c611..c61b5b873d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -98,7 +98,7 @@ private InformerEventSource( parseResourceVersions); // If there is a primary to secondary mapper there is no need for primary to secondary index. primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); - if (primaryToSecondaryMapper == null) { + if (useSecondaryToPrimaryIndex()) { complementaryPrimaryToSecondaryIndex = new DefaultComplementaryPrimaryToSecondaryIndex<>( configuration.getSecondaryToPrimaryMapper()); @@ -264,8 +264,7 @@ public Set getSecondaryResources(P primary) { primary.getMetadata().getName(), primary.getMetadata().getNamespace(), resources.size()); - var complementaryIds = - complementaryPrimaryToSecondaryIndex.getComplementarySecondaryResources(primaryID); + var complementaryIds = complementaryPrimaryToSecondaryIndex.getSecondaryResources(primaryID); var res = resources.stream() .map( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java index e8318315cb..49e535de2b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java @@ -15,7 +15,7 @@ public void explicitAdd(R resource) {} public void cleanupForResource(R resourceID) {} @Override - public Set getComplementarySecondaryResources(ResourceID primary) { + public Set getSecondaryResources(ResourceID primary) { return Set.of(); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java new file mode 100644 index 0000000000..2690ca6713 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java @@ -0,0 +1,134 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Set; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class DefaultComplementaryPrimaryToSecondaryIndexTest { + + @SuppressWarnings("unchecked") + private final SecondaryToPrimaryMapper secondaryToPrimaryMapperMock = + mock(SecondaryToPrimaryMapper.class); + + private final DefaultComplementaryPrimaryToSecondaryIndex primaryToSecondaryIndex = + new DefaultComplementaryPrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock); + + private final ResourceID primaryID1 = new ResourceID("id1", "default"); + private final ResourceID primaryID2 = new ResourceID("id2", "default"); + private final ConfigMap secondary1 = secondary("secondary1"); + private final ConfigMap secondary2 = secondary("secondary2"); + + @BeforeEach + void setup() { + when(secondaryToPrimaryMapperMock.toPrimaryResourceIDs(any())) + .thenReturn(Set.of(primaryID1, primaryID2)); + } + + @Test + void returnsEmptySetOnEmptyIndex() { + var res = primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(secondary1)); + assertThat(res).isEmpty(); + } + + @Test + void indexesNewResources() { + primaryToSecondaryIndex.explicitAdd(secondary1); + + var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); + var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + + assertThat(secondaryResources1).containsOnly(ResourceID.fromResource(secondary1)); + assertThat(secondaryResources2).containsOnly(ResourceID.fromResource(secondary1)); + } + + @Test + void indexesAdditionalResources() { + primaryToSecondaryIndex.explicitAdd(secondary1); + primaryToSecondaryIndex.explicitAdd(secondary2); + + var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); + var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + + assertThat(secondaryResources1) + .containsOnly(ResourceID.fromResource(secondary1), ResourceID.fromResource(secondary2)); + assertThat(secondaryResources2) + .containsOnly(ResourceID.fromResource(secondary1), ResourceID.fromResource(secondary2)); + } + + @Test + void removingResourceFromIndex() { + primaryToSecondaryIndex.explicitAdd(secondary1); + primaryToSecondaryIndex.explicitAdd(secondary2); + primaryToSecondaryIndex.cleanupForResource(secondary1); + + var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); + var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + + assertThat(secondaryResources1).containsOnly(ResourceID.fromResource(secondary2)); + assertThat(secondaryResources2).containsOnly(ResourceID.fromResource(secondary2)); + + primaryToSecondaryIndex.cleanupForResource(secondary2); + + secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); + secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + + assertThat(secondaryResources1).isEmpty(); + assertThat(secondaryResources2).isEmpty(); + } + + @Test + void testPerformance() { + var primaryToSecondaryIndex = + new DefaultComplementaryPrimaryToSecondaryIndex<>( + new SecondaryToPrimaryMapper() { + @Override + public Set toPrimaryResourceIDs(HasMetadata resource) { + return Set.of( + new ResourceID( + resource.getMetadata().getName(), resource.getMetadata().getNamespace())); + } + }); + var start = LocalDateTime.now(); + for (int i = 0; i < 1_000_000; i++) { + primaryToSecondaryIndex.explicitAdd(cm(i)); + } + System.out.println(ChronoUnit.MILLIS.between(start, LocalDateTime.now())); + + start = LocalDateTime.now(); + for (int i = 0; i < 1_000_000; i++) { + primaryToSecondaryIndex.cleanupForResource(cm(i)); + } + System.out.println(ChronoUnit.MILLIS.between(start, LocalDateTime.now())); + System.out.println("ok"); + } + + private static ConfigMap cm(int i) { + return new ConfigMapBuilder() + .withMetadata(new ObjectMetaBuilder().withName("test" + i).withNamespace("default").build()) + .build(); + } + + ConfigMap secondary(String name) { + ConfigMap configMap = new ConfigMap(); + configMap.setMetadata(new ObjectMeta()); + configMap.getMetadata().setName(name); + configMap.getMetadata().setNamespace("default"); + return configMap; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java index b82fe27415..e595f15279 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java @@ -16,7 +16,7 @@ class DependentResourceCrossRefIT { public static final String TEST_RESOURCE_NAME = "test"; - public static final int EXECUTION_NUMBER = 150; + public static final int EXECUTION_NUMBER = 250; @RegisterExtension LocallyRunOperatorExtension operator = diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefReconciler.java index 5d54ecdabe..247174838c 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefReconciler.java @@ -5,6 +5,9 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.api.model.Secret; @@ -26,6 +29,8 @@ @ControllerConfiguration public class DependentResourceCrossRefReconciler implements Reconciler { + private static final Logger log = + LoggerFactory.getLogger(DependentResourceCrossRefReconciler.class); public static final String SECRET_NAME = "secret"; private final AtomicInteger numberOfExecutions = new AtomicInteger(0); @@ -48,6 +53,7 @@ public ErrorStatusUpdateControl updateErrorSt DependentResourceCrossRefResource resource, Context context, Exception e) { + log.error("Status update on error", e); errorHappened = true; return ErrorStatusUpdateControl.noStatusUpdate(); } From 1da8b911095fa5a13fb2c8f5dc31f24a98a900ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Jul 2025 15:19:25 +0200 Subject: [PATCH 08/18] logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../ComplementaryPrimaryToSecondaryIndex.java | 2 +- .../DefaultComplementaryPrimaryToSecondaryIndex.java | 2 +- .../event/source/informer/InformerEventSource.java | 7 ++++--- .../NOOPComplementaryPrimaryToSecondaryIndex.java | 2 +- ...aultComplementaryPrimaryToSecondaryIndexTest.java | 12 ++++++------ 5 files changed, 13 insertions(+), 12 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java index 06e1b9c560..7db4337410 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java @@ -7,7 +7,7 @@ public interface ComplementaryPrimaryToSecondaryIndex { - void explicitAdd(R resource); + void explicitAddOrUpdate(R resource); void cleanupForResource(R resourceID); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java index ab1fb3d0fe..b455e66790 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java @@ -21,7 +21,7 @@ public DefaultComplementaryPrimaryToSecondaryIndex( } @Override - public void explicitAdd(R resource) { + public void explicitAddOrUpdate(R resource) { Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); primaryResources.forEach( primaryResource -> { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index c61b5b873d..2f56809dd3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -257,7 +257,7 @@ public Set getSecondaryResources(P primary) { if (useSecondaryToPrimaryIndex()) { var primaryID = ResourceID.fromResource(primary); var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID)); - + log.debug("Resources in cache: {}", resources); log.debug( "Using informer primary to secondary index to find secondary resources for primary name:" + " {} namespace: {}. Found {}", @@ -265,6 +265,7 @@ public Set getSecondaryResources(P primary) { primary.getMetadata().getNamespace(), resources.size()); var complementaryIds = complementaryPrimaryToSecondaryIndex.getSecondaryResources(primaryID); + log.debug("Complementary ids: {}", complementaryIds); var res = resources.stream() .map( @@ -278,7 +279,7 @@ public Set getSecondaryResources(P primary) { complementaryIds.forEach( id -> { Optional resource = get(id); - resource.ifPresent(res::add); + resource.ifPresentOrElse(res::add, () -> log.debug("Resource not found: {}", id)); }); return res; } else { @@ -307,7 +308,7 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res } private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { - complementaryPrimaryToSecondaryIndex.explicitAdd(newResource); + complementaryPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource); temporaryResourceCache.putResource( newResource, Optional.ofNullable(oldResource) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java index 49e535de2b..4a478faeae 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java @@ -9,7 +9,7 @@ public class NOOPComplementaryPrimaryToSecondaryIndex implements ComplementaryPrimaryToSecondaryIndex { @Override - public void explicitAdd(R resource) {} + public void explicitAddOrUpdate(R resource) {} @Override public void cleanupForResource(R resourceID) {} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java index 2690ca6713..d13a4c496d 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java @@ -48,7 +48,7 @@ void returnsEmptySetOnEmptyIndex() { @Test void indexesNewResources() { - primaryToSecondaryIndex.explicitAdd(secondary1); + primaryToSecondaryIndex.explicitAddOrUpdate(secondary1); var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); @@ -59,8 +59,8 @@ void indexesNewResources() { @Test void indexesAdditionalResources() { - primaryToSecondaryIndex.explicitAdd(secondary1); - primaryToSecondaryIndex.explicitAdd(secondary2); + primaryToSecondaryIndex.explicitAddOrUpdate(secondary1); + primaryToSecondaryIndex.explicitAddOrUpdate(secondary2); var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); @@ -73,8 +73,8 @@ void indexesAdditionalResources() { @Test void removingResourceFromIndex() { - primaryToSecondaryIndex.explicitAdd(secondary1); - primaryToSecondaryIndex.explicitAdd(secondary2); + primaryToSecondaryIndex.explicitAddOrUpdate(secondary1); + primaryToSecondaryIndex.explicitAddOrUpdate(secondary2); primaryToSecondaryIndex.cleanupForResource(secondary1); var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); @@ -106,7 +106,7 @@ public Set toPrimaryResourceIDs(HasMetadata resource) { }); var start = LocalDateTime.now(); for (int i = 0; i < 1_000_000; i++) { - primaryToSecondaryIndex.explicitAdd(cm(i)); + primaryToSecondaryIndex.explicitAddOrUpdate(cm(i)); } System.out.println(ChronoUnit.MILLIS.between(start, LocalDateTime.now())); From 53c2155baed43b98f638380c61f0e966de2c85a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Jul 2025 15:47:46 +0200 Subject: [PATCH 09/18] cleanup on test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../DependentResourceCrossRefIT.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java index e595f15279..d078356a35 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java @@ -44,6 +44,22 @@ void dependentResourceCanReferenceEachOther() { assertThat(operator.get(Secret.class, TEST_RESOURCE_NAME + i)).isNotNull(); } }); + + for (int i = 0; i < EXECUTION_NUMBER; i++) { + operator.delete(testResource(i)); + } + await() + .timeout(Duration.ofSeconds(30)) + .untilAsserted( + () -> { + for (int i = 0; i < EXECUTION_NUMBER; i++) { + assertThat( + operator.get( + DependentResourceCrossRefResource.class, + testResource(i).getMetadata().getName())) + .isNull(); + } + }); } DependentResourceCrossRefResource testResource(int n) { From 08c0d5ec56f75c475eb6fe75b06ee3b70f839a57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Jul 2025 15:47:59 +0200 Subject: [PATCH 10/18] lower limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../dependentresourcecrossref/DependentResourceCrossRefIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java index d078356a35..1b71c79448 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java @@ -16,7 +16,7 @@ class DependentResourceCrossRefIT { public static final String TEST_RESOURCE_NAME = "test"; - public static final int EXECUTION_NUMBER = 250; + public static final int EXECUTION_NUMBER = 50; @RegisterExtension LocallyRunOperatorExtension operator = From e5aef3018f7ded68e09d62a38bdc053079abd415 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Jul 2025 17:11:13 +0200 Subject: [PATCH 11/18] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/InformerEventSource.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 2f56809dd3..69864da957 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -256,15 +256,16 @@ public Set getSecondaryResources(P primary) { if (useSecondaryToPrimaryIndex()) { var primaryID = ResourceID.fromResource(primary); + var complementaryIds = complementaryPrimaryToSecondaryIndex.getSecondaryResources(primaryID); var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID)); - log.debug("Resources in cache: {}", resources); + log.debug("Resources in cache: {} kind: {}", resources, resourceType().getSimpleName()); log.debug( "Using informer primary to secondary index to find secondary resources for primary name:" + " {} namespace: {}. Found {}", primary.getMetadata().getName(), primary.getMetadata().getNamespace(), resources.size()); - var complementaryIds = complementaryPrimaryToSecondaryIndex.getSecondaryResources(primaryID); + log.debug("Complementary ids: {}", complementaryIds); var res = resources.stream() @@ -279,7 +280,7 @@ public Set getSecondaryResources(P primary) { complementaryIds.forEach( id -> { Optional resource = get(id); - resource.ifPresentOrElse(res::add, () -> log.debug("Resource not found: {}", id)); + resource.ifPresentOrElse(res::add, () -> log.warn("Resource not found: {}", id)); }); return res; } else { From 0c1b21053a31c68e0781cb0e413a2543135784d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Jul 2025 17:37:45 +0200 Subject: [PATCH 12/18] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../dependentresourcecrossref/DependentResourceCrossRefIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java index 1b71c79448..59aca0db8e 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java @@ -16,7 +16,7 @@ class DependentResourceCrossRefIT { public static final String TEST_RESOURCE_NAME = "test"; - public static final int EXECUTION_NUMBER = 50; + public static final int EXECUTION_NUMBER = 100; @RegisterExtension LocallyRunOperatorExtension operator = From 3d797ce35ca6450e57121a903f32d15d588caf5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 24 Jul 2025 08:42:38 +0200 Subject: [PATCH 13/18] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../dependentresourcecrossref/DependentResourceCrossRefIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java index 59aca0db8e..d078356a35 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java @@ -16,7 +16,7 @@ class DependentResourceCrossRefIT { public static final String TEST_RESOURCE_NAME = "test"; - public static final int EXECUTION_NUMBER = 100; + public static final int EXECUTION_NUMBER = 250; @RegisterExtension LocallyRunOperatorExtension operator = From e003db3fc153ea89cd77dbb9676f240ee6db58aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 24 Jul 2025 13:38:41 +0200 Subject: [PATCH 14/18] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- ...tComplementaryPrimaryToSecondaryIndex.java | 61 -------- ...faultTemporalPrimaryToSecondaryIndex.java} | 10 +- .../source/informer/InformerEventSource.java | 17 ++- ...PComplementaryPrimaryToSecondaryIndex.java | 21 --- ... NOOPTemporalPrimaryToSecondaryIndex.java} | 14 +- .../informer/PrimaryToSecondaryIndex.java | 15 -- ...a => TemporalPrimaryToSecondaryIndex.java} | 4 +- ...plementaryPrimaryToSecondaryIndexTest.java | 134 ------------------ ... TemporalPrimaryToSecondaryIndexTest.java} | 39 ++--- 9 files changed, 44 insertions(+), 271 deletions(-) delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/{DefaultPrimaryToSecondaryIndex.java => DefaultTemporalPrimaryToSecondaryIndex.java} (84%) delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/{NOOPPrimaryToSecondaryIndex.java => NOOPTemporalPrimaryToSecondaryIndex.java} (54%) delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/{ComplementaryPrimaryToSecondaryIndex.java => TemporalPrimaryToSecondaryIndex.java} (72%) delete mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java rename operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/{PrimaryToSecondaryIndexTest.java => TemporalPrimaryToSecondaryIndexTest.java} (60%) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java deleted file mode 100644 index b455e66790..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndex.java +++ /dev/null @@ -1,61 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source.informer; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; - -public class DefaultComplementaryPrimaryToSecondaryIndex - implements ComplementaryPrimaryToSecondaryIndex { - - private final ConcurrentHashMap> index = new ConcurrentHashMap<>(); - private final SecondaryToPrimaryMapper secondaryToPrimaryMapper; - - public DefaultComplementaryPrimaryToSecondaryIndex( - SecondaryToPrimaryMapper secondaryToPrimaryMapper) { - this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; - } - - @Override - public void explicitAddOrUpdate(R resource) { - Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); - primaryResources.forEach( - primaryResource -> { - var resourceSet = - index.computeIfAbsent(primaryResource, pr -> ConcurrentHashMap.newKeySet()); - resourceSet.add(ResourceID.fromResource(resource)); - }); - } - - @Override - public void cleanupForResource(R resource) { - Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); - primaryResources.forEach( - primaryResource -> { - var secondaryResources = index.get(primaryResource); - // this can be null in just very special cases, like when the secondaryToPrimaryMapper is - // changing dynamically. Like if a list of ResourceIDs mapped dynamically extended in the - // mapper between the onAddOrUpdate and onDelete is called. - if (secondaryResources != null) { - secondaryResources.remove(ResourceID.fromResource(resource)); - if (secondaryResources.isEmpty()) { - index.remove(primaryResource); - } - } - }); - } - - @Override - public Set getSecondaryResources(ResourceID primary) { - var resourceIDs = index.get(primary); - if (resourceIDs == null) { - return Collections.emptySet(); - } else { - return new HashSet<>(resourceIDs); - } - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java similarity index 84% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndex.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java index a1a5a96d36..dc1512339f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java @@ -7,17 +7,19 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; -class DefaultPrimaryToSecondaryIndex implements PrimaryToSecondaryIndex { +class DefaultTemporalPrimaryToSecondaryIndex + implements TemporalPrimaryToSecondaryIndex { private final SecondaryToPrimaryMapper secondaryToPrimaryMapper; private final Map> index = new HashMap<>(); - public DefaultPrimaryToSecondaryIndex(SecondaryToPrimaryMapper secondaryToPrimaryMapper) { + public DefaultTemporalPrimaryToSecondaryIndex( + SecondaryToPrimaryMapper secondaryToPrimaryMapper) { this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; } @Override - public synchronized void onAddOrUpdate(R resource) { + public synchronized void explicitAddOrUpdate(R resource) { Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); primaryResources.forEach( primaryResource -> { @@ -28,7 +30,7 @@ public synchronized void onAddOrUpdate(R resource) { } @Override - public synchronized void onDelete(R resource) { + public synchronized void cleanupForResource(R resource) { Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); primaryResources.forEach( primaryResource -> { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 69864da957..02623b0d1c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -65,7 +65,7 @@ public class InformerEventSource private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; - private final ComplementaryPrimaryToSecondaryIndex complementaryPrimaryToSecondaryIndex; + private final TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex; private final String id = UUID.randomUUID().toString(); public InformerEventSource( @@ -99,9 +99,8 @@ private InformerEventSource( // If there is a primary to secondary mapper there is no need for primary to secondary index. primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); if (useSecondaryToPrimaryIndex()) { - complementaryPrimaryToSecondaryIndex = - new DefaultComplementaryPrimaryToSecondaryIndex<>( - configuration.getSecondaryToPrimaryMapper()); + temporalPrimaryToSecondaryIndex = + new DefaultTemporalPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper()); addIndexers( Map.of( PRIMARY_TO_SECONDARY_INDEX_NAME, @@ -110,7 +109,7 @@ private InformerEventSource( .map(InformerEventSource::resourceIdToString) .toList())); } else { - complementaryPrimaryToSecondaryIndex = new NOOPComplementaryPrimaryToSecondaryIndex(); + temporalPrimaryToSecondaryIndex = NOOPTemporalPrimaryToSecondaryIndex.getInstance(); } final var informerConfig = configuration.getInformerConfig(); @@ -159,7 +158,7 @@ public void onDelete(R resource, boolean b) { ResourceID.fromResource(resource), resourceType().getSimpleName()); } - complementaryPrimaryToSecondaryIndex.cleanupForResource(resource); + temporalPrimaryToSecondaryIndex.cleanupForResource(resource); super.onDelete(resource, b); if (acceptedByDeleteFilters(resource, b)) { propagateEvent(resource); @@ -169,7 +168,7 @@ public void onDelete(R resource, boolean b) { private synchronized void onAddOrUpdate( Operation operation, R newObject, R oldObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); - complementaryPrimaryToSecondaryIndex.cleanupForResource(newObject); + temporalPrimaryToSecondaryIndex.cleanupForResource(newObject); if (canSkipEvent(newObject, oldObject, resourceID)) { log.debug( "Skipping event propagation for {}, since was a result of a reconcile action. Resource" @@ -256,7 +255,7 @@ public Set getSecondaryResources(P primary) { if (useSecondaryToPrimaryIndex()) { var primaryID = ResourceID.fromResource(primary); - var complementaryIds = complementaryPrimaryToSecondaryIndex.getSecondaryResources(primaryID); + var complementaryIds = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID); var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID)); log.debug("Resources in cache: {} kind: {}", resources, resourceType().getSimpleName()); log.debug( @@ -309,7 +308,7 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res } private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { - complementaryPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource); temporaryResourceCache.putResource( newResource, Optional.ofNullable(oldResource) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java deleted file mode 100644 index 4a478faeae..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPComplementaryPrimaryToSecondaryIndex.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source.informer; - -import java.util.Set; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -public class NOOPComplementaryPrimaryToSecondaryIndex - implements ComplementaryPrimaryToSecondaryIndex { - - @Override - public void explicitAddOrUpdate(R resource) {} - - @Override - public void cleanupForResource(R resourceID) {} - - @Override - public Set getSecondaryResources(ResourceID primary) { - return Set.of(); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPTemporalPrimaryToSecondaryIndex.java similarity index 54% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPPrimaryToSecondaryIndex.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPTemporalPrimaryToSecondaryIndex.java index abefbba638..bf837c7507 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPTemporalPrimaryToSecondaryIndex.java @@ -5,25 +5,27 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; -class NOOPPrimaryToSecondaryIndex implements PrimaryToSecondaryIndex { +class NOOPTemporalPrimaryToSecondaryIndex + implements TemporalPrimaryToSecondaryIndex { @SuppressWarnings("rawtypes") - private static final NOOPPrimaryToSecondaryIndex instance = new NOOPPrimaryToSecondaryIndex(); + private static final NOOPTemporalPrimaryToSecondaryIndex instance = + new NOOPTemporalPrimaryToSecondaryIndex(); @SuppressWarnings("unchecked") - public static NOOPPrimaryToSecondaryIndex getInstance() { + public static NOOPTemporalPrimaryToSecondaryIndex getInstance() { return instance; } - private NOOPPrimaryToSecondaryIndex() {} + private NOOPTemporalPrimaryToSecondaryIndex() {} @Override - public void onAddOrUpdate(R resource) { + public void explicitAddOrUpdate(R resource) { // empty method because of noop implementation } @Override - public void onDelete(R resource) { + public void cleanupForResource(R resource) { // empty method because of noop implementation } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java deleted file mode 100644 index 7a87b23272..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source.informer; - -import java.util.Set; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -public interface PrimaryToSecondaryIndex { - - void onAddOrUpdate(R resource); - - void onDelete(R resource); - - Set getSecondaryResources(ResourceID primary); -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndex.java similarity index 72% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndex.java index 7db4337410..e6059d0983 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndex.java @@ -5,11 +5,11 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; -public interface ComplementaryPrimaryToSecondaryIndex { +public interface TemporalPrimaryToSecondaryIndex { void explicitAddOrUpdate(R resource); - void cleanupForResource(R resourceID); + void cleanupForResource(R resource); Set getSecondaryResources(ResourceID primary); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java deleted file mode 100644 index d13a4c496d..0000000000 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultComplementaryPrimaryToSecondaryIndexTest.java +++ /dev/null @@ -1,134 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source.informer; - -import java.time.LocalDateTime; -import java.time.temporal.ChronoUnit; -import java.util.Set; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import io.fabric8.kubernetes.api.model.ConfigMap; -import io.fabric8.kubernetes.api.model.ConfigMapBuilder; -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; -import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -class DefaultComplementaryPrimaryToSecondaryIndexTest { - - @SuppressWarnings("unchecked") - private final SecondaryToPrimaryMapper secondaryToPrimaryMapperMock = - mock(SecondaryToPrimaryMapper.class); - - private final DefaultComplementaryPrimaryToSecondaryIndex primaryToSecondaryIndex = - new DefaultComplementaryPrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock); - - private final ResourceID primaryID1 = new ResourceID("id1", "default"); - private final ResourceID primaryID2 = new ResourceID("id2", "default"); - private final ConfigMap secondary1 = secondary("secondary1"); - private final ConfigMap secondary2 = secondary("secondary2"); - - @BeforeEach - void setup() { - when(secondaryToPrimaryMapperMock.toPrimaryResourceIDs(any())) - .thenReturn(Set.of(primaryID1, primaryID2)); - } - - @Test - void returnsEmptySetOnEmptyIndex() { - var res = primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(secondary1)); - assertThat(res).isEmpty(); - } - - @Test - void indexesNewResources() { - primaryToSecondaryIndex.explicitAddOrUpdate(secondary1); - - var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); - - assertThat(secondaryResources1).containsOnly(ResourceID.fromResource(secondary1)); - assertThat(secondaryResources2).containsOnly(ResourceID.fromResource(secondary1)); - } - - @Test - void indexesAdditionalResources() { - primaryToSecondaryIndex.explicitAddOrUpdate(secondary1); - primaryToSecondaryIndex.explicitAddOrUpdate(secondary2); - - var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); - - assertThat(secondaryResources1) - .containsOnly(ResourceID.fromResource(secondary1), ResourceID.fromResource(secondary2)); - assertThat(secondaryResources2) - .containsOnly(ResourceID.fromResource(secondary1), ResourceID.fromResource(secondary2)); - } - - @Test - void removingResourceFromIndex() { - primaryToSecondaryIndex.explicitAddOrUpdate(secondary1); - primaryToSecondaryIndex.explicitAddOrUpdate(secondary2); - primaryToSecondaryIndex.cleanupForResource(secondary1); - - var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); - - assertThat(secondaryResources1).containsOnly(ResourceID.fromResource(secondary2)); - assertThat(secondaryResources2).containsOnly(ResourceID.fromResource(secondary2)); - - primaryToSecondaryIndex.cleanupForResource(secondary2); - - secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); - - assertThat(secondaryResources1).isEmpty(); - assertThat(secondaryResources2).isEmpty(); - } - - @Test - void testPerformance() { - var primaryToSecondaryIndex = - new DefaultComplementaryPrimaryToSecondaryIndex<>( - new SecondaryToPrimaryMapper() { - @Override - public Set toPrimaryResourceIDs(HasMetadata resource) { - return Set.of( - new ResourceID( - resource.getMetadata().getName(), resource.getMetadata().getNamespace())); - } - }); - var start = LocalDateTime.now(); - for (int i = 0; i < 1_000_000; i++) { - primaryToSecondaryIndex.explicitAddOrUpdate(cm(i)); - } - System.out.println(ChronoUnit.MILLIS.between(start, LocalDateTime.now())); - - start = LocalDateTime.now(); - for (int i = 0; i < 1_000_000; i++) { - primaryToSecondaryIndex.cleanupForResource(cm(i)); - } - System.out.println(ChronoUnit.MILLIS.between(start, LocalDateTime.now())); - System.out.println("ok"); - } - - private static ConfigMap cm(int i) { - return new ConfigMapBuilder() - .withMetadata(new ObjectMetaBuilder().withName("test" + i).withNamespace("default").build()) - .build(); - } - - ConfigMap secondary(String name) { - ConfigMap configMap = new ConfigMap(); - configMap.setMetadata(new ObjectMeta()); - configMap.getMetadata().setName(name); - configMap.getMetadata().setNamespace("default"); - return configMap; - } -} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndexTest.java similarity index 60% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndexTest.java index 7343b1e581..2cbce08c51 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndexTest.java @@ -15,14 +15,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class PrimaryToSecondaryIndexTest { +class TemporalPrimaryToSecondaryIndexTest { @SuppressWarnings("unchecked") private final SecondaryToPrimaryMapper secondaryToPrimaryMapperMock = mock(SecondaryToPrimaryMapper.class); - private final PrimaryToSecondaryIndex primaryToSecondaryIndex = - new DefaultPrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock); + private final TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex = + new DefaultTemporalPrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock); private final ResourceID primaryID1 = new ResourceID("id1", "default"); private final ResourceID primaryID2 = new ResourceID("id2", "default"); @@ -37,16 +37,17 @@ void setup() { @Test void returnsEmptySetOnEmptyIndex() { - var res = primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(secondary1)); + var res = + temporalPrimaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(secondary1)); assertThat(res).isEmpty(); } @Test void indexesNewResources() { - primaryToSecondaryIndex.onAddOrUpdate(secondary1); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary1); - var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + var secondaryResources1 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID1); + var secondaryResources2 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID2); assertThat(secondaryResources1).containsOnly(ResourceID.fromResource(secondary1)); assertThat(secondaryResources2).containsOnly(ResourceID.fromResource(secondary1)); @@ -54,11 +55,11 @@ void indexesNewResources() { @Test void indexesAdditionalResources() { - primaryToSecondaryIndex.onAddOrUpdate(secondary1); - primaryToSecondaryIndex.onAddOrUpdate(secondary2); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary1); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary2); - var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + var secondaryResources1 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID1); + var secondaryResources2 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID2); assertThat(secondaryResources1) .containsOnly(ResourceID.fromResource(secondary1), ResourceID.fromResource(secondary2)); @@ -68,20 +69,20 @@ void indexesAdditionalResources() { @Test void removingResourceFromIndex() { - primaryToSecondaryIndex.onAddOrUpdate(secondary1); - primaryToSecondaryIndex.onAddOrUpdate(secondary2); - primaryToSecondaryIndex.onDelete(secondary1); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary1); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary2); + temporalPrimaryToSecondaryIndex.cleanupForResource(secondary1); - var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + var secondaryResources1 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID1); + var secondaryResources2 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID2); assertThat(secondaryResources1).containsOnly(ResourceID.fromResource(secondary2)); assertThat(secondaryResources2).containsOnly(ResourceID.fromResource(secondary2)); - primaryToSecondaryIndex.onDelete(secondary2); + temporalPrimaryToSecondaryIndex.cleanupForResource(secondary2); - secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + secondaryResources1 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID1); + secondaryResources2 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID2); assertThat(secondaryResources1).isEmpty(); assertThat(secondaryResources2).isEmpty(); From 17d40cc1343ab03c39fd5fb5bc90420a6dacb058 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 24 Jul 2025 13:49:21 +0200 Subject: [PATCH 15/18] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/InformerEventSource.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 02623b0d1c..c6ef90d32b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -255,28 +255,33 @@ public Set getSecondaryResources(P primary) { if (useSecondaryToPrimaryIndex()) { var primaryID = ResourceID.fromResource(primary); - var complementaryIds = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID); + // Note that the order matter is these lines. This method is not synchronized + // because of performance reasons. If it was in reverse order, it could happen + // that we did not receive yet an event in the informer so the index would not + // be updated. However, before reading it from temp IDs the event arrives and erases + // the temp index. So in case of Add not id would be found. + var temporalIds = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID); var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID)); - log.debug("Resources in cache: {} kind: {}", resources, resourceType().getSimpleName()); + log.debug( "Using informer primary to secondary index to find secondary resources for primary name:" - + " {} namespace: {}. Found {}", + + " {} namespace: {}. Found number {}", primary.getMetadata().getName(), primary.getMetadata().getNamespace(), resources.size()); - log.debug("Complementary ids: {}", complementaryIds); + log.debug("Complementary ids: {}", temporalIds); var res = resources.stream() .map( r -> { var resourceId = ResourceID.fromResource(r); Optional resource = temporaryResourceCache.getResourceFromCache(resourceId); - complementaryIds.remove(resourceId); + temporalIds.remove(resourceId); return resource.orElse(r); }) .collect(Collectors.toSet()); - complementaryIds.forEach( + temporalIds.forEach( id -> { Optional resource = get(id); resource.ifPresentOrElse(res::add, () -> log.warn("Resource not found: {}", id)); @@ -299,15 +304,15 @@ public Set getSecondaryResources(P primary) { @Override public synchronized void handleRecentResourceUpdate( ResourceID resourceID, R resource, R previousVersionOfResource) { - handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource); + handleRecentCreateOrUpdate(resource, previousVersionOfResource); } @Override public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { - handleRecentCreateOrUpdate(Operation.ADD, resource, null); + handleRecentCreateOrUpdate(resource, null); } - private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { + private void handleRecentCreateOrUpdate(R newResource, R oldResource) { temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource); temporaryResourceCache.putResource( newResource, From 9e41f508e8621b751ef123dc43a0bbbb520514ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 24 Jul 2025 14:06:51 +0200 Subject: [PATCH 16/18] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/DefaultTemporalPrimaryToSecondaryIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java index dc1512339f..48a45dd489 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java @@ -53,7 +53,7 @@ public synchronized Set getSecondaryResources(ResourceID primary) { if (resourceIDs == null) { return Collections.emptySet(); } else { - return Collections.unmodifiableSet(resourceIDs); + return new HashSet<>(resourceIDs); } } } From faf9bdf0dc341fff41dd0931399db9bae6b6ed77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 24 Jul 2025 14:51:07 +0200 Subject: [PATCH 17/18] improve: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/InformerEventSource.java | 25 +++-- .../informer/ManagedInformerEventSource.java | 9 +- .../informer/TemporaryResourceCache.java | 102 ++++++++++-------- .../TemporaryPrimaryResourceCacheTest.java | 8 +- 4 files changed, 86 insertions(+), 58 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index c6ef90d32b..8b34437f24 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -65,7 +65,6 @@ public class InformerEventSource private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; - private final TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex; private final String id = UUID.randomUUID().toString(); public InformerEventSource( @@ -99,8 +98,6 @@ private InformerEventSource( // If there is a primary to secondary mapper there is no need for primary to secondary index. primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); if (useSecondaryToPrimaryIndex()) { - temporalPrimaryToSecondaryIndex = - new DefaultTemporalPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper()); addIndexers( Map.of( PRIMARY_TO_SECONDARY_INDEX_NAME, @@ -108,8 +105,6 @@ private InformerEventSource( configuration.getSecondaryToPrimaryMapper().toPrimaryResourceIDs(r).stream() .map(InformerEventSource::resourceIdToString) .toList())); - } else { - temporalPrimaryToSecondaryIndex = NOOPTemporalPrimaryToSecondaryIndex.getInstance(); } final var informerConfig = configuration.getInformerConfig(); @@ -158,7 +153,6 @@ public void onDelete(R resource, boolean b) { ResourceID.fromResource(resource), resourceType().getSimpleName()); } - temporalPrimaryToSecondaryIndex.cleanupForResource(resource); super.onDelete(resource, b); if (acceptedByDeleteFilters(resource, b)) { propagateEvent(resource); @@ -168,7 +162,6 @@ public void onDelete(R resource, boolean b) { private synchronized void onAddOrUpdate( Operation operation, R newObject, R oldObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); - temporalPrimaryToSecondaryIndex.cleanupForResource(newObject); if (canSkipEvent(newObject, oldObject, resourceID)) { log.debug( "Skipping event propagation for {}, since was a result of a reconcile action. Resource" @@ -260,7 +253,10 @@ public Set getSecondaryResources(P primary) { // that we did not receive yet an event in the informer so the index would not // be updated. However, before reading it from temp IDs the event arrives and erases // the temp index. So in case of Add not id would be found. - var temporalIds = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID); + var temporalIds = + temporaryResourceCache + .getTemporalPrimaryToSecondaryIndex() + .getSecondaryResources(primaryID); var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID)); log.debug( @@ -313,7 +309,6 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res } private void handleRecentCreateOrUpdate(R newResource, R oldResource) { - temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource); temporaryResourceCache.putResource( newResource, Optional.ofNullable(oldResource) @@ -370,4 +365,16 @@ private enum Operation { private static String resourceIdToString(ResourceID resourceID) { return resourceID.getName() + "#" + resourceID.getNamespace().orElse("$na"); } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + protected TemporaryResourceCache temporaryResourceCache() { + return new TemporaryResourceCache<>( + this, + useSecondaryToPrimaryIndex() + ? new DefaultTemporalPrimaryToSecondaryIndex( + configuration().getSecondaryToPrimaryMapper()) + : NOOPTemporalPrimaryToSecondaryIndex.getInstance(), + parseResourceVersions); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 549d2236cd..63318cd04e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -40,7 +40,7 @@ public abstract class ManagedInformerEventSource< private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); private InformerManager cache; - private final boolean parseResourceVersions; + protected final boolean parseResourceVersions; private ControllerConfiguration controllerConfiguration; private final C configuration; private final Map>> indexers = new HashMap<>(); @@ -87,7 +87,7 @@ public synchronized void start() { if (isRunning()) { return; } - temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions); + temporaryResourceCache = temporaryResourceCache(); this.cache = new InformerManager<>(client, configuration, this); cache.setControllerConfiguration(controllerConfiguration); cache.addIndexers(indexers); @@ -133,6 +133,11 @@ public Optional get(ResourceID resourceID) { } } + protected TemporaryResourceCache temporaryResourceCache() { + return new TemporaryResourceCache<>( + this, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), parseResourceVersions); + } + @SuppressWarnings("unused") public Optional getCachedValue(ResourceID resourceID) { return get(resourceID); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index af75a5abc4..5abde92814 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -31,47 +31,6 @@ */ public class TemporaryResourceCache { - static class ExpirationCache { - private final LinkedHashMap cache; - private final int ttlMs; - - public ExpirationCache(int maxEntries, int ttlMs) { - this.ttlMs = ttlMs; - this.cache = - new LinkedHashMap<>() { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > maxEntries; - } - }; - } - - public void add(K key) { - clean(); - cache.putIfAbsent(key, System.currentTimeMillis()); - } - - public boolean contains(K key) { - clean(); - return cache.get(key) != null; - } - - void clean() { - if (!cache.isEmpty()) { - long currentTimeMillis = System.currentTimeMillis(); - var iter = cache.entrySet().iterator(); - // the order will already be from oldest to newest, clean a fixed number of entries to - // amortize the cost amongst multiple calls - for (int i = 0; i < 10 && iter.hasNext(); i++) { - var entry = iter.next(); - if (currentTimeMillis - entry.getValue() > ttlMs) { - iter.remove(); - } - } - } - } - } - private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); private final Map cache = new ConcurrentHashMap<>(); @@ -81,12 +40,15 @@ void clean() { private final ManagedInformerEventSource managedInformerEventSource; private final boolean parseResourceVersions; private final ExpirationCache knownResourceVersions; + private final TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex; public TemporaryResourceCache( ManagedInformerEventSource managedInformerEventSource, + TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex, boolean parseResourceVersions) { this.managedInformerEventSource = managedInformerEventSource; this.parseResourceVersions = parseResourceVersions; + this.temporalPrimaryToSecondaryIndex = temporalPrimaryToSecondaryIndex; if (parseResourceVersions) { // keep up to the 50000 add/updates for up to 5 minutes knownResourceVersions = new ExpirationCache<>(50000, 600000); @@ -105,10 +67,14 @@ public synchronized void onAddOrUpdateEvent(T resource) { } synchronized void onEvent(T resource, boolean unknownState) { - cache.computeIfPresent( - ResourceID.fromResource(resource), - (id, cached) -> - (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached); + var res = + cache.computeIfPresent( + ResourceID.fromResource(resource), + (id, cached) -> + (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached); + if (res == null) { + temporalPrimaryToSecondaryIndex.cleanupForResource(resource); + } } public synchronized void putAddedResource(T newResource) { @@ -154,6 +120,7 @@ public synchronized void putResource(T newResource, String previousResourceVersi newResource.getMetadata().getResourceVersion(), resourceId); cache.put(resourceId, newResource); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource); } else if (cache.remove(resourceId) != null) { log.debug("Removed an obsolete resource from cache for id: {}", resourceId); } @@ -189,4 +156,49 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } + + public TemporalPrimaryToSecondaryIndex getTemporalPrimaryToSecondaryIndex() { + return temporalPrimaryToSecondaryIndex; + } + + static class ExpirationCache { + private final LinkedHashMap cache; + private final int ttlMs; + + public ExpirationCache(int maxEntries, int ttlMs) { + this.ttlMs = ttlMs; + this.cache = + new LinkedHashMap<>() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxEntries; + } + }; + } + + public void add(K key) { + clean(); + cache.putIfAbsent(key, System.currentTimeMillis()); + } + + public boolean contains(K key) { + clean(); + return cache.get(key) != null; + } + + void clean() { + if (!cache.isEmpty()) { + long currentTimeMillis = System.currentTimeMillis(); + var iter = cache.entrySet().iterator(); + // the order will already be from oldest to newest, clean a fixed number of entries to + // amortize the cost amongst multiple calls + for (int i = 0; i < 10 && iter.hasNext(); i++) { + var entry = iter.next(); + if (currentTimeMillis - entry.getValue() > ttlMs) { + iter.remove(); + } + } + } + } + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index e62888832f..2e0f68308b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -31,7 +31,9 @@ class TemporaryPrimaryResourceCacheTest { @BeforeEach void setup() { informerEventSource = mock(InformerEventSource.class); - temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false); + temporaryResourceCache = + new TemporaryResourceCache<>( + informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), false); } @Test @@ -94,7 +96,9 @@ void removesResourceFromCache() { @Test void resourceVersionParsing() { - this.temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true); + this.temporaryResourceCache = + new TemporaryResourceCache<>( + informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), true); assertThat(temporaryResourceCache.isKnownResourceVersion(testResource())).isFalse(); From f9aafb8a15872700c9ed1ef4b46f959d991ce763 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 24 Jul 2025 15:47:18 +0200 Subject: [PATCH 18/18] Revert "improve:" This reverts commit faf9bdf0dc341fff41dd0931399db9bae6b6ed77. --- .../source/informer/InformerEventSource.java | 25 ++--- .../informer/ManagedInformerEventSource.java | 9 +- .../informer/TemporaryResourceCache.java | 102 ++++++++---------- .../TemporaryPrimaryResourceCacheTest.java | 8 +- 4 files changed, 58 insertions(+), 86 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 8b34437f24..c6ef90d32b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -65,6 +65,7 @@ public class InformerEventSource private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; + private final TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex; private final String id = UUID.randomUUID().toString(); public InformerEventSource( @@ -98,6 +99,8 @@ private InformerEventSource( // If there is a primary to secondary mapper there is no need for primary to secondary index. primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); if (useSecondaryToPrimaryIndex()) { + temporalPrimaryToSecondaryIndex = + new DefaultTemporalPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper()); addIndexers( Map.of( PRIMARY_TO_SECONDARY_INDEX_NAME, @@ -105,6 +108,8 @@ private InformerEventSource( configuration.getSecondaryToPrimaryMapper().toPrimaryResourceIDs(r).stream() .map(InformerEventSource::resourceIdToString) .toList())); + } else { + temporalPrimaryToSecondaryIndex = NOOPTemporalPrimaryToSecondaryIndex.getInstance(); } final var informerConfig = configuration.getInformerConfig(); @@ -153,6 +158,7 @@ public void onDelete(R resource, boolean b) { ResourceID.fromResource(resource), resourceType().getSimpleName()); } + temporalPrimaryToSecondaryIndex.cleanupForResource(resource); super.onDelete(resource, b); if (acceptedByDeleteFilters(resource, b)) { propagateEvent(resource); @@ -162,6 +168,7 @@ public void onDelete(R resource, boolean b) { private synchronized void onAddOrUpdate( Operation operation, R newObject, R oldObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); + temporalPrimaryToSecondaryIndex.cleanupForResource(newObject); if (canSkipEvent(newObject, oldObject, resourceID)) { log.debug( "Skipping event propagation for {}, since was a result of a reconcile action. Resource" @@ -253,10 +260,7 @@ public Set getSecondaryResources(P primary) { // that we did not receive yet an event in the informer so the index would not // be updated. However, before reading it from temp IDs the event arrives and erases // the temp index. So in case of Add not id would be found. - var temporalIds = - temporaryResourceCache - .getTemporalPrimaryToSecondaryIndex() - .getSecondaryResources(primaryID); + var temporalIds = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID); var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID)); log.debug( @@ -309,6 +313,7 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res } private void handleRecentCreateOrUpdate(R newResource, R oldResource) { + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource); temporaryResourceCache.putResource( newResource, Optional.ofNullable(oldResource) @@ -365,16 +370,4 @@ private enum Operation { private static String resourceIdToString(ResourceID resourceID) { return resourceID.getName() + "#" + resourceID.getNamespace().orElse("$na"); } - - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - protected TemporaryResourceCache temporaryResourceCache() { - return new TemporaryResourceCache<>( - this, - useSecondaryToPrimaryIndex() - ? new DefaultTemporalPrimaryToSecondaryIndex( - configuration().getSecondaryToPrimaryMapper()) - : NOOPTemporalPrimaryToSecondaryIndex.getInstance(), - parseResourceVersions); - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 63318cd04e..549d2236cd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -40,7 +40,7 @@ public abstract class ManagedInformerEventSource< private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); private InformerManager cache; - protected final boolean parseResourceVersions; + private final boolean parseResourceVersions; private ControllerConfiguration controllerConfiguration; private final C configuration; private final Map>> indexers = new HashMap<>(); @@ -87,7 +87,7 @@ public synchronized void start() { if (isRunning()) { return; } - temporaryResourceCache = temporaryResourceCache(); + temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions); this.cache = new InformerManager<>(client, configuration, this); cache.setControllerConfiguration(controllerConfiguration); cache.addIndexers(indexers); @@ -133,11 +133,6 @@ public Optional get(ResourceID resourceID) { } } - protected TemporaryResourceCache temporaryResourceCache() { - return new TemporaryResourceCache<>( - this, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), parseResourceVersions); - } - @SuppressWarnings("unused") public Optional getCachedValue(ResourceID resourceID) { return get(resourceID); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 5abde92814..af75a5abc4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -31,6 +31,47 @@ */ public class TemporaryResourceCache { + static class ExpirationCache { + private final LinkedHashMap cache; + private final int ttlMs; + + public ExpirationCache(int maxEntries, int ttlMs) { + this.ttlMs = ttlMs; + this.cache = + new LinkedHashMap<>() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxEntries; + } + }; + } + + public void add(K key) { + clean(); + cache.putIfAbsent(key, System.currentTimeMillis()); + } + + public boolean contains(K key) { + clean(); + return cache.get(key) != null; + } + + void clean() { + if (!cache.isEmpty()) { + long currentTimeMillis = System.currentTimeMillis(); + var iter = cache.entrySet().iterator(); + // the order will already be from oldest to newest, clean a fixed number of entries to + // amortize the cost amongst multiple calls + for (int i = 0; i < 10 && iter.hasNext(); i++) { + var entry = iter.next(); + if (currentTimeMillis - entry.getValue() > ttlMs) { + iter.remove(); + } + } + } + } + } + private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); private final Map cache = new ConcurrentHashMap<>(); @@ -40,15 +81,12 @@ public class TemporaryResourceCache { private final ManagedInformerEventSource managedInformerEventSource; private final boolean parseResourceVersions; private final ExpirationCache knownResourceVersions; - private final TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex; public TemporaryResourceCache( ManagedInformerEventSource managedInformerEventSource, - TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex, boolean parseResourceVersions) { this.managedInformerEventSource = managedInformerEventSource; this.parseResourceVersions = parseResourceVersions; - this.temporalPrimaryToSecondaryIndex = temporalPrimaryToSecondaryIndex; if (parseResourceVersions) { // keep up to the 50000 add/updates for up to 5 minutes knownResourceVersions = new ExpirationCache<>(50000, 600000); @@ -67,14 +105,10 @@ public synchronized void onAddOrUpdateEvent(T resource) { } synchronized void onEvent(T resource, boolean unknownState) { - var res = - cache.computeIfPresent( - ResourceID.fromResource(resource), - (id, cached) -> - (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached); - if (res == null) { - temporalPrimaryToSecondaryIndex.cleanupForResource(resource); - } + cache.computeIfPresent( + ResourceID.fromResource(resource), + (id, cached) -> + (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached); } public synchronized void putAddedResource(T newResource) { @@ -120,7 +154,6 @@ public synchronized void putResource(T newResource, String previousResourceVersi newResource.getMetadata().getResourceVersion(), resourceId); cache.put(resourceId, newResource); - temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource); } else if (cache.remove(resourceId) != null) { log.debug("Removed an obsolete resource from cache for id: {}", resourceId); } @@ -156,49 +189,4 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } - - public TemporalPrimaryToSecondaryIndex getTemporalPrimaryToSecondaryIndex() { - return temporalPrimaryToSecondaryIndex; - } - - static class ExpirationCache { - private final LinkedHashMap cache; - private final int ttlMs; - - public ExpirationCache(int maxEntries, int ttlMs) { - this.ttlMs = ttlMs; - this.cache = - new LinkedHashMap<>() { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > maxEntries; - } - }; - } - - public void add(K key) { - clean(); - cache.putIfAbsent(key, System.currentTimeMillis()); - } - - public boolean contains(K key) { - clean(); - return cache.get(key) != null; - } - - void clean() { - if (!cache.isEmpty()) { - long currentTimeMillis = System.currentTimeMillis(); - var iter = cache.entrySet().iterator(); - // the order will already be from oldest to newest, clean a fixed number of entries to - // amortize the cost amongst multiple calls - for (int i = 0; i < 10 && iter.hasNext(); i++) { - var entry = iter.next(); - if (currentTimeMillis - entry.getValue() > ttlMs) { - iter.remove(); - } - } - } - } - } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index 2e0f68308b..e62888832f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -31,9 +31,7 @@ class TemporaryPrimaryResourceCacheTest { @BeforeEach void setup() { informerEventSource = mock(InformerEventSource.class); - temporaryResourceCache = - new TemporaryResourceCache<>( - informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), false); + temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false); } @Test @@ -96,9 +94,7 @@ void removesResourceFromCache() { @Test void resourceVersionParsing() { - this.temporaryResourceCache = - new TemporaryResourceCache<>( - informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), true); + this.temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true); assertThat(temporaryResourceCache.isKnownResourceVersion(testResource())).isFalse();