Skip to content

fix: invalidate cluster cache after sync operations #745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ type ClusterCache interface {
GetGVKParser() *managedfields.GvkParser
// Invalidate cache and executes callback that optionally might update cache settings
Invalidate(opts ...UpdateSettingsFunc)
// InvalidateResources invalidates specific resources in the cache
InvalidateResources(keys []kube.ResourceKey)
// FindResources returns resources that matches given list of predicates from specified namespace or everywhere if specified namespace is empty
FindResources(namespace string, predicates ...func(r *Resource) bool) map[kube.ResourceKey]*Resource
// IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree.
Expand Down Expand Up @@ -490,6 +492,29 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
c.log.Info("Invalidated cluster")
}

// InvalidateResources invalidates specific resources in the cache by removing them.
// This forces the resources to be refreshed from the cluster on the next access.
func (c *clusterCache) InvalidateResources(keys []kube.ResourceKey) {
if len(keys) == 0 {
return
}

c.lock.Lock()
defer c.lock.Unlock()

for _, key := range keys {
if _, exists := c.resources[key]; exists {
// Remove the resource from cache - this will force it to be refreshed on next access
c.onNodeRemoved(key)
c.log.Info("Invalidated resource from cache", "key", key.String())
} else {
c.log.Info("Resource not found in cache for invalidation", "key", key.String())
}
}

c.log.Info("Invalidated specific resources from cache", "count", len(keys))
}

// clusterCacheSync's lock should be held before calling this method
func (syncStatus *clusterCacheSync) synced(clusterRetryTimeout time.Duration) bool {
syncTime := syncStatus.syncTime
Expand Down
16 changes: 16 additions & 0 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ func (e *gitOpsEngine) Sync(ctx context.Context,
namespace string,
opts ...sync.SyncOpt,
) ([]common.ResourceSyncResult, error) {
// Ensure cache is synced before getting managed live objects
// This forces a refresh if the cache was invalidated
err := e.cache.EnsureSynced()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this function was designed to be called just when the gitops engine is being initialized. This will call the clusterCacheSync.synced function that will just validate on the sync time. The ClusterCache should always be synced because it updates based on resource watches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after looking at argo-cd code more it seems like GetManagedLiveObjs() actually calls c.getSyncedCluster(destCluster) which eventually calls clusterCache.EnsureSynced() anyway.

So this part is definitely not needed

if err != nil {
return nil, fmt.Errorf("error during sync: failed to ensure cache is synced: %w", err)
}

managedResources, err := e.cache.GetManagedLiveObjs(resources, isManaged)
if err != nil {
return nil, fmt.Errorf("failed to get managed live objects: %w", err)
Expand All @@ -84,6 +91,15 @@ func (e *gitOpsEngine) Sync(ctx context.Context,
return nil, fmt.Errorf("failed to diff objects: %w", err)
}
opts = append(opts, sync.WithSkipHooks(!diffRes.Modified))

// Add cache invalidation callback to invalidate cache for modified resources after sync
opts = append(opts, sync.WithCacheInvalidationCallback(func(modifiedResources []kube.ResourceKey) {
// Only invalidate the specific resources that were modified
if len(modifiedResources) > 0 {
e.cache.InvalidateResources(modifiedResources)
}
}))
Comment on lines +96 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need to invalidate the cache on every sync if the cluster cache is based on resource watches? I feel that the root problem is elsewhere. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah seems like this solution is reactively removing cached resources and is more of a workaround.

The problem is more likely in watchEvents not properly updating resources like CRDs schema changes.


syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), opts...)
if err != nil {
return nil, fmt.Errorf("failed to create sync context: %w", err)
Expand Down
44 changes: 44 additions & 0 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ func WithClientSideApplyMigration(enabled bool, manager string) SyncOpt {
}
}

// WithCacheInvalidationCallback sets a callback that will be invoked after successful sync operations
// to invalidate the cache
func WithCacheInvalidationCallback(callback func([]kubeutil.ResourceKey)) SyncOpt {
return func(ctx *syncContext) {
ctx.cacheInvalidationCallback = callback
}
}

// NewSyncContext creates new instance of a SyncContext
func NewSyncContext(
revision string,
Expand Down Expand Up @@ -389,6 +397,10 @@ type syncContext struct {
applyOutOfSyncOnly bool
// stores whether the resource is modified or not
modificationResult map[kubeutil.ResourceKey]bool

// cacheInvalidationCallback is a callback that will be invoked after successful sync operations
// to invalidate the cache
cacheInvalidationCallback func([]kubeutil.ResourceKey)
}

func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool) {
Expand Down Expand Up @@ -557,6 +569,15 @@ func (sc *syncContext) Sync() {
// delete all completed hooks which have appropriate delete policy
sc.deleteHooks(hooksPendingDeletionSuccessful)
sc.setOperationPhase(common.OperationSucceeded, "successfully synced (no more tasks)")

// Invalidate cache after successful sync
if sc.cacheInvalidationCallback != nil {
modifiedResources := make([]kubeutil.ResourceKey, 0, len(sc.syncRes))
for _, result := range sc.syncRes {
modifiedResources = append(modifiedResources, result.ResourceKey)
}
sc.cacheInvalidationCallback(modifiedResources)
}
Comment on lines +573 to +580
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment applies:

Why would we need to invalidate the cache on every sync if the cluster cache is based on resource watches? I feel that the root problem is elsewhere. WDYT?

return
}

Expand Down Expand Up @@ -593,11 +614,34 @@ func (sc *syncContext) Sync() {
syncFailedTasks, _ := tasks.Split(func(t *syncTask) bool { return t.syncStatus == common.ResultCodeSyncFailed })
sc.deleteHooks(hooksPendingDeletionFailed)
sc.setOperationFailed(syncFailTasks, syncFailedTasks, "one or more objects failed to apply")

// Invalidate cache for successfully synced resources even if overall operation failed
if sc.cacheInvalidationCallback != nil {
var modifiedResources []kubeutil.ResourceKey
for _, result := range sc.syncRes {
// Only invalidate resources that were successfully synced
if result.Status == common.ResultCodeSynced {
modifiedResources = append(modifiedResources, result.ResourceKey)
}
}
if len(modifiedResources) > 0 {
sc.cacheInvalidationCallback(modifiedResources)
}
}
case successful:
if remainingTasks.Len() == 0 {
// delete all completed hooks which have appropriate delete policy
sc.deleteHooks(hooksPendingDeletionSuccessful)
sc.setOperationPhase(common.OperationSucceeded, "successfully synced (all tasks run)")

// Invalidate cache after successful sync
if sc.cacheInvalidationCallback != nil {
modifiedResources := make([]kubeutil.ResourceKey, 0, len(sc.syncRes))
for _, result := range sc.syncRes {
modifiedResources = append(modifiedResources, result.ResourceKey)
}
sc.cacheInvalidationCallback(modifiedResources)
}
} else {
sc.setRunningPhase(remainingTasks, false)
}
Expand Down
Loading