diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 38f1e6016..ff742e450 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -127,6 +127,8 @@ type ClusterCache interface { GetGVKParser() *managedfields.GvkParser // Invalidate cache and executes callback that optionally might update cache settings Invalidate(opts ...UpdateSettingsFunc) + // InvalidateResources invalidates specific resources in the cache + InvalidateResources(keys []kube.ResourceKey) // FindResources returns resources that matches given list of predicates from specified namespace or everywhere if specified namespace is empty FindResources(namespace string, predicates ...func(r *Resource) bool) map[kube.ResourceKey]*Resource // IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree. @@ -490,6 +492,29 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { c.log.Info("Invalidated cluster") } +// InvalidateResources invalidates specific resources in the cache by removing them. +// This forces the resources to be refreshed from the cluster on the next access. +func (c *clusterCache) InvalidateResources(keys []kube.ResourceKey) { + if len(keys) == 0 { + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + for _, key := range keys { + if _, exists := c.resources[key]; exists { + // Remove the resource from cache - this will force it to be refreshed on next access + c.onNodeRemoved(key) + c.log.Info("Invalidated resource from cache", "key", key.String()) + } else { + c.log.Info("Resource not found in cache for invalidation", "key", key.String()) + } + } + + c.log.Info("Invalidated specific resources from cache", "count", len(keys)) +} + // clusterCacheSync's lock should be held before calling this method func (syncStatus *clusterCacheSync) synced(clusterRetryTimeout time.Duration) bool { syncTime := syncStatus.syncTime diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 01e5561b1..d3e5f0575 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -74,6 +74,13 @@ func (e *gitOpsEngine) Sync(ctx context.Context, namespace string, opts ...sync.SyncOpt, ) ([]common.ResourceSyncResult, error) { + // Ensure cache is synced before getting managed live objects + // This forces a refresh if the cache was invalidated + err := e.cache.EnsureSynced() + if err != nil { + return nil, fmt.Errorf("error during sync: failed to ensure cache is synced: %w", err) + } + managedResources, err := e.cache.GetManagedLiveObjs(resources, isManaged) if err != nil { return nil, fmt.Errorf("failed to get managed live objects: %w", err) @@ -84,6 +91,15 @@ func (e *gitOpsEngine) Sync(ctx context.Context, return nil, fmt.Errorf("failed to diff objects: %w", err) } opts = append(opts, sync.WithSkipHooks(!diffRes.Modified)) + + // Add cache invalidation callback to invalidate cache for modified resources after sync + opts = append(opts, sync.WithCacheInvalidationCallback(func(modifiedResources []kube.ResourceKey) { + // Only invalidate the specific resources that were modified + if len(modifiedResources) > 0 { + e.cache.InvalidateResources(modifiedResources) + } + })) + syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), opts...) if err != nil { return nil, fmt.Errorf("failed to create sync context: %w", err) diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 8f4d51e4f..cf66c47fb 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -218,6 +218,14 @@ func WithClientSideApplyMigration(enabled bool, manager string) SyncOpt { } } +// WithCacheInvalidationCallback sets a callback that will be invoked after successful sync operations +// to invalidate the cache +func WithCacheInvalidationCallback(callback func([]kubeutil.ResourceKey)) SyncOpt { + return func(ctx *syncContext) { + ctx.cacheInvalidationCallback = callback + } +} + // NewSyncContext creates new instance of a SyncContext func NewSyncContext( revision string, @@ -389,6 +397,10 @@ type syncContext struct { applyOutOfSyncOnly bool // stores whether the resource is modified or not modificationResult map[kubeutil.ResourceKey]bool + + // cacheInvalidationCallback is a callback that will be invoked after successful sync operations + // to invalidate the cache + cacheInvalidationCallback func([]kubeutil.ResourceKey) } func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool) { @@ -557,6 +569,15 @@ func (sc *syncContext) Sync() { // delete all completed hooks which have appropriate delete policy sc.deleteHooks(hooksPendingDeletionSuccessful) sc.setOperationPhase(common.OperationSucceeded, "successfully synced (no more tasks)") + + // Invalidate cache after successful sync + if sc.cacheInvalidationCallback != nil { + modifiedResources := make([]kubeutil.ResourceKey, 0, len(sc.syncRes)) + for _, result := range sc.syncRes { + modifiedResources = append(modifiedResources, result.ResourceKey) + } + sc.cacheInvalidationCallback(modifiedResources) + } return } @@ -593,11 +614,34 @@ func (sc *syncContext) Sync() { syncFailedTasks, _ := tasks.Split(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed }) sc.deleteHooks(hooksPendingDeletionFailed) sc.setOperationFailed(syncFailTasks, syncFailedTasks, "one or more objects failed to apply") + + // Invalidate cache for successfully synced resources even if overall operation failed + if sc.cacheInvalidationCallback != nil { + var modifiedResources []kubeutil.ResourceKey + for _, result := range sc.syncRes { + // Only invalidate resources that were successfully synced + if result.Status == common.ResultCodeSynced { + modifiedResources = append(modifiedResources, result.ResourceKey) + } + } + if len(modifiedResources) > 0 { + sc.cacheInvalidationCallback(modifiedResources) + } + } case successful: if remainingTasks.Len() == 0 { // delete all completed hooks which have appropriate delete policy sc.deleteHooks(hooksPendingDeletionSuccessful) sc.setOperationPhase(common.OperationSucceeded, "successfully synced (all tasks run)") + + // Invalidate cache after successful sync + if sc.cacheInvalidationCallback != nil { + modifiedResources := make([]kubeutil.ResourceKey, 0, len(sc.syncRes)) + for _, result := range sc.syncRes { + modifiedResources = append(modifiedResources, result.ResourceKey) + } + sc.cacheInvalidationCallback(modifiedResources) + } } else { sc.setRunningPhase(remainingTasks, false) } diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 0e8d01ebb..8a0ac7e5e 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -12,22 +12,22 @@ import ( "testing" "time" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime/schema" - "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" fakedisco "k8s.io/client-go/discovery/fake" "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/rest" testcore "k8s.io/client-go/testing" "k8s.io/klog/v2/textlogger" + cmdutil "k8s.io/kubectl/pkg/cmd/util" "github.com/argoproj/gitops-engine/pkg/diff" "github.com/argoproj/gitops-engine/pkg/health" @@ -362,7 +362,7 @@ func TestSyncCreateFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ testSvc.GetName(): { Output: "", - Err: errors.New("foo"), + Err: errors.New("invalid object failing dry-run"), }, }, } @@ -371,7 +371,7 @@ func TestSyncCreateFailure(t *testing.T) { Commands: map[string]kubetest.KubectlOutput{ testSvc.GetName(): { Output: "", - Err: errors.New("foo"), + Err: errors.New("invalid object failing dry-run"), }, }, } @@ -387,7 +387,7 @@ func TestSyncCreateFailure(t *testing.T) { assert.Len(t, resources, 1) result := resources[0] assert.Equal(t, synccommon.ResultCodeSyncFailed, result.Status) - assert.Equal(t, "foo", result.Message) + assert.Equal(t, "invalid object failing dry-run", result.Message) } func TestSync_ApplyOutOfSyncOnly(t *testing.T) { @@ -1268,11 +1268,11 @@ func TestSyncFailureHookWithFailedSync(t *testing.T) { }) syncCtx.hooks = []*unstructured.Unstructured{newHook(synccommon.HookTypeSyncFail)} mockKubectl := &kubetest.MockKubectlCmd{ - Commands: map[string]kubetest.KubectlOutput{pod.GetName(): {Err: errors.New("")}}, + Commands: map[string]kubetest.KubectlOutput{pod.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}}, } syncCtx.kubectl = mockKubectl mockResourceOps := kubetest.MockResourceOps{ - Commands: map[string]kubetest.KubectlOutput{pod.GetName(): {Err: errors.New("")}}, + Commands: map[string]kubetest.KubectlOutput{pod.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}}, } syncCtx.resourceOps = &mockResourceOps @@ -1321,18 +1321,18 @@ func TestRunSyncFailHooksFailed(t *testing.T) { mockKubectl := &kubetest.MockKubectlCmd{ Commands: map[string]kubetest.KubectlOutput{ // Fail operation - pod.GetName(): {Err: errors.New("")}, + pod.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}, // Fail a single SyncFail hook - failedSyncFailHook.GetName(): {Err: errors.New("")}, + failedSyncFailHook.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}, }, } syncCtx.kubectl = mockKubectl mockResourceOps := kubetest.MockResourceOps{ Commands: map[string]kubetest.KubectlOutput{ // Fail operation - pod.GetName(): {Err: errors.New("")}, + pod.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}, // Fail a single SyncFail hook - failedSyncFailHook.GetName(): {Err: errors.New("")}, + failedSyncFailHook.GetName(): {Err: apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace)}, }, } syncCtx.resourceOps = &mockResourceOps @@ -2262,3 +2262,200 @@ func TestNeedsClientSideApplyMigration(t *testing.T) { }) } } + +func TestSyncContext_CacheInvalidationCallback(t *testing.T) { + // Track which resources are passed to the callback + var callbackInvoked bool + var invalidatedResources []kube.ResourceKey + + callback := func(resources []kube.ResourceKey) { + callbackInvoked = true + invalidatedResources = append(invalidatedResources, resources...) + } + + syncCtx := newTestSyncCtx(nil, + WithOperationSettings(false, false, false, false), + WithCacheInvalidationCallback(callback), + ) + + // Create test resources + pod := testingutils.NewPod() + pod.SetName("test-pod") + pod.SetNamespace(testingutils.FakeArgoCDNamespace) + service := testingutils.NewService() + service.SetName("test-service") + service.SetNamespace(testingutils.FakeArgoCDNamespace) + + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{nil, nil}, + Target: []*unstructured.Unstructured{pod, service}, + }) + + // Run sync + syncCtx.Sync() + phase, _, resources := syncCtx.GetState() + + // Verify sync completed successfully + assert.Equal(t, synccommon.OperationSucceeded, phase) + assert.Len(t, resources, 2) + + // Verify callback was invoked with the modified resources + assert.True(t, callbackInvoked, "Cache invalidation callback should have been invoked") + assert.Len(t, invalidatedResources, 2, "Should have invalidated 2 resources") + + // Verify the correct resources were passed to the callback + expectedKeys := []kube.ResourceKey{ + kube.GetResourceKey(pod), + kube.GetResourceKey(service), + } + + for _, expectedKey := range expectedKeys { + assert.Contains(t, invalidatedResources, expectedKey, "Expected resource key should be in invalidated resources") + } +} + +func TestSyncContext_CacheInvalidationCallback_NoResources(t *testing.T) { + // Track whether callback is invoked + var callbackInvoked bool + var invalidatedResources []kube.ResourceKey + + callback := func(resources []kube.ResourceKey) { + callbackInvoked = true + invalidatedResources = append(invalidatedResources, resources...) + } + + syncCtx := newTestSyncCtx(nil, + WithOperationSettings(false, false, false, false), + WithCacheInvalidationCallback(callback), + ) + + // No resources to sync + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{}, + Target: []*unstructured.Unstructured{}, + }) + + // Run sync + syncCtx.Sync() + phase, _, resources := syncCtx.GetState() + + // Verify sync completed successfully + assert.Equal(t, synccommon.OperationSucceeded, phase) + assert.Empty(t, resources) + + // Verify callback was invoked with empty resource list + assert.True(t, callbackInvoked, "Cache invalidation callback should have been invoked") + assert.Empty(t, invalidatedResources, "Should have invalidated 0 resources") +} + +func TestSyncContext_CacheInvalidationCallback_PartialFailure(t *testing.T) { + // Track which resources are passed to the callback + var callbackInvoked bool + var invalidatedResources []kube.ResourceKey + + callback := func(resources []kube.ResourceKey) { + callbackInvoked = true + invalidatedResources = append(invalidatedResources, resources...) + } + + syncCtx := newTestSyncCtx(nil, + WithOperationSettings(false, false, false, false), + WithCacheInvalidationCallback(callback), + ) + + // Create test resources - one will succeed, one will fail + pod := testingutils.NewPod() + pod.SetName("test-pod") + pod.SetNamespace(testingutils.FakeArgoCDNamespace) + service := testingutils.NewService() + service.SetName("test-service") + service.SetNamespace(testingutils.FakeArgoCDNamespace) + + // Create a custom mock that can distinguish between dry-run and wet-run + customMockResourceOps := &customMockResourceOps{ + MockResourceOps: &kubetest.MockResourceOps{ + Commands: map[string]kubetest.KubectlOutput{ + // No commands in the default map - will succeed + }, + }, + } + + syncCtx.resourceOps = customMockResourceOps + + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{nil, nil}, + Target: []*unstructured.Unstructured{pod, service}, + }) + + // Run sync + syncCtx.Sync() + phase, _, resources := syncCtx.GetState() + + // Verify sync completed with failure + assert.Equal(t, synccommon.OperationFailed, phase) + assert.Len(t, resources, 2) + + // Verify callback was invoked (should be called even on partial failure) + assert.True(t, callbackInvoked, "Cache invalidation callback should have been invoked") + + // Should only invalidate the successfully synced resource + assert.Len(t, invalidatedResources, 1, "Should have invalidated 1 resource") + + // Verify the successful resource was invalidated + serviceKey := kube.GetResourceKey(service) + assert.Contains(t, invalidatedResources, serviceKey, "Should have invalidated the successful resource") +} + +// customMockResourceOps is a custom mock that can distinguish between dry-run and wet-run +type customMockResourceOps struct { + *kubetest.MockResourceOps +} + +func (c *customMockResourceOps) ApplyResource(ctx context.Context, obj *unstructured.Unstructured, dryRunStrategy cmdutil.DryRunStrategy, force, validate, serverSideApply bool, manager string) (string, error) { + // Let dry-run succeed for both resources + if dryRunStrategy == cmdutil.DryRunClient { + return "dry-run successful", nil + } + + // During wet-run, fail the pod but succeed the service + if obj.GetKind() == "Pod" && obj.GetName() == "test-pod" { + return "", apierrors.NewNotFound(schema.GroupResource{}, testingutils.FakeArgoCDNamespace) + } + + // For service, succeed + if obj.GetKind() == "Service" && obj.GetName() == "test-service" { + return "service sync successful", nil + } + + // Default behavior for other resources + _, err := c.MockResourceOps.ApplyResource(ctx, obj, dryRunStrategy, force, validate, serverSideApply, manager) + if err != nil { + return "", fmt.Errorf("apply resource failed: %w", err) + } + return "default success", nil +} + +func TestSyncContext_CacheInvalidationCallback_NilCallback(t *testing.T) { + // Test with nil callback (should not panic) + syncCtx := newTestSyncCtx(nil, + WithOperationSettings(false, false, false, false), + WithCacheInvalidationCallback(nil), + ) + + pod := testingutils.NewPod() + pod.SetName("test-pod") + pod.SetNamespace(testingutils.FakeArgoCDNamespace) + + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{nil}, + Target: []*unstructured.Unstructured{pod}, + }) + + // Run sync - should not panic + syncCtx.Sync() + phase, _, resources := syncCtx.GetState() + + // Verify sync completed successfully + assert.Equal(t, synccommon.OperationSucceeded, phase) + assert.Len(t, resources, 1) +}