Skip to content

Commit 618f491

Browse files
committed
fix: resolve cache race with singleflight
Before this change, cache Get and Add were separate operations, creating a TOCTOU race: concurrent requests for the same resource could all miss the cache and each trigger a remote resolution. Merge Get/Add/GetFromCacheOrResolve into a single GetCachedOrResolveFromRemote method on resolverCache that wraps the resolve callback with golang.org/x/sync/singleflight. Only one in-flight resolution per cache key proceeds; all concurrent callers share its result. Other changes in this commit: - Use strings.Builder in generateCacheKey instead of string concatenation - Remove unused resolverType param from ShouldUse - Rework e2e tests to verify caching by counting actual registry GET requests (via logs and metrics) instead of checking resolver log messages - Add multi-replica resolver test (4 replicas, 200 TaskRuns) - Allow -run flag to bypass category filtering in TestMain Issue #9364 Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
1 parent abfa29d commit 618f491

13 files changed

Lines changed: 634 additions & 1060 deletions

File tree

pkg/remoteresolution/resolver/bundle/resolver.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,8 @@ func (r *Resolver) Resolve(ctx context.Context, req *v1beta1.ResolutionRequestSp
109109
return nil, errors.New("no params")
110110
}
111111

112-
if cache.ShouldUse(ctx, r, req.Params, LabelValueBundleResolverType) {
113-
return cache.GetFromCacheOrResolve(
114-
ctx,
112+
if cache.ShouldUse(ctx, r, req.Params) {
113+
return cache.Get(ctx).GetCachedOrResolveFromRemote(
115114
req.Params,
116115
LabelValueBundleResolverType,
117116
func() (resolutionframework.ResolvedResource, error) {

pkg/remoteresolution/resolver/cluster/resolver.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,8 @@ func (r *Resolver) IsImmutable([]v1.Param) bool {
8888

8989
// Resolve uses the given params to resolve the requested file or resource.
9090
func (r *Resolver) Resolve(ctx context.Context, req *v1beta1.ResolutionRequestSpec) (resolutionframework.ResolvedResource, error) {
91-
if cache.ShouldUse(ctx, r, req.Params, LabelValueClusterResolverType) {
92-
return cache.GetFromCacheOrResolve(
93-
ctx,
91+
if cache.ShouldUse(ctx, r, req.Params) {
92+
return cache.Get(ctx).GetCachedOrResolveFromRemote(
9493
req.Params,
9594
LabelValueClusterResolverType,
9695
func() (resolutionframework.ResolvedResource, error) {

pkg/remoteresolution/resolver/cluster/resolver_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -632,8 +632,11 @@ func TestResolveWithCacheHit(t *testing.T) {
632632
{Name: "cache", Value: *pipelinev1.NewStructuredValues("always")},
633633
}
634634

635-
// add the mock resource to the cache
636-
cache.Get(ctx).Add(cluster.LabelValueClusterResolverType, params, mockResource)
635+
// prepopulate the cache using GetCachedOrResolveFromRemote
636+
resolveFn := func() (framework.ResolvedResource, error) {
637+
return mockResource, nil
638+
}
639+
cache.Get(ctx).GetCachedOrResolveFromRemote(params, cluster.LabelValueClusterResolverType, resolveFn)
637640

638641
// create request with same parameters
639642
req := &v1beta1.ResolutionRequestSpec{Params: params}

pkg/remoteresolution/resolver/framework/cache/cache.go

Lines changed: 71 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,31 @@ import (
2020
"context"
2121
"crypto/sha256"
2222
"encoding/hex"
23+
"errors"
2324
"sort"
25+
"strings"
2426
"time"
2527

2628
pipelinev1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
2729
resolutionframework "github.com/tektoncd/pipeline/pkg/resolution/resolver/framework"
2830
"go.uber.org/zap"
31+
"golang.org/x/sync/singleflight"
2932
utilcache "k8s.io/apimachinery/pkg/util/cache"
3033
)
3134

35+
type resolveFn = func() (resolutionframework.ResolvedResource, error)
36+
3237
var _ resolutionframework.ConfigWatcher = (*resolverCache)(nil)
3338

3439
// resolverCache is a wrapper around utilcache.LRUExpireCache that provides
3540
// type-safe methods for caching resolver results.
3641
type resolverCache struct {
37-
cache *utilcache.LRUExpireCache
38-
logger *zap.SugaredLogger
39-
ttl time.Duration
40-
maxSize int
41-
clock utilcache.Clock
42+
cache *utilcache.LRUExpireCache
43+
logger *zap.SugaredLogger
44+
ttl time.Duration
45+
maxSize int
46+
clock utilcache.Clock
47+
flightGroup *singleflight.Group
4248
}
4349

4450
func newResolverCache(maxSize int, ttl time.Duration) *resolverCache {
@@ -47,10 +53,11 @@ func newResolverCache(maxSize int, ttl time.Duration) *resolverCache {
4753

4854
func newResolverCacheWithClock(maxSize int, ttl time.Duration, clock utilcache.Clock) *resolverCache {
4955
return &resolverCache{
50-
cache: utilcache.NewLRUExpireCacheWithClock(maxSize, clock),
51-
ttl: ttl,
52-
maxSize: maxSize,
53-
clock: clock,
56+
cache: utilcache.NewLRUExpireCacheWithClock(maxSize, clock),
57+
ttl: ttl,
58+
maxSize: maxSize,
59+
clock: clock,
60+
flightGroup: &singleflight.Group{},
5461
}
5562
}
5663

@@ -62,7 +69,7 @@ func (c *resolverCache) GetConfigName(_ context.Context) string {
6269
// withLogger returns a new ResolverCache instance with the provided logger.
6370
// This prevents state leak by not storing logger in the global singleton.
6471
func (c *resolverCache) withLogger(logger *zap.SugaredLogger) *resolverCache {
65-
return &resolverCache{logger: logger, cache: c.cache, ttl: c.ttl, maxSize: c.maxSize, clock: c.clock}
72+
return &resolverCache{logger: logger, cache: c.cache, ttl: c.ttl, maxSize: c.maxSize, clock: c.clock, flightGroup: c.flightGroup}
6673
}
6774

6875
// TTL returns the time-to-live duration for cache entries.
@@ -75,57 +82,57 @@ func (c *resolverCache) MaxSize() int {
7582
return c.maxSize
7683
}
7784

78-
// Get retrieves a cached resource by resolver type and parameters, returning
79-
// the resource and whether it was found.
80-
func (c *resolverCache) Get(resolverType string, params []pipelinev1.Param) (resolutionframework.ResolvedResource, bool) {
85+
func (c *resolverCache) GetCachedOrResolveFromRemote(
86+
params []pipelinev1.Param,
87+
resolverType string,
88+
resolveFromRemote resolveFn,
89+
) (resolutionframework.ResolvedResource, error) {
8190
key := generateCacheKey(resolverType, params)
82-
value, found := c.cache.Get(key)
83-
if !found {
84-
c.infow("Cache miss", "key", key)
85-
return nil, found
86-
}
8791

88-
resource, ok := value.(resolutionframework.ResolvedResource)
89-
if !ok {
90-
c.infow("Failed casting cached resource", "key", key)
91-
return nil, false
92-
}
92+
if untyped, found := c.cache.Get(key); found {
93+
cached, ok := untyped.(resolutionframework.ResolvedResource)
94+
if !ok {
95+
c.infow("Failed casting cached resource", "key", key)
96+
return nil, errors.New("failed casting cached resource")
97+
}
9398

94-
c.infow("Cache hit", "key", key)
95-
timestamp := c.clock.Now().Format(time.RFC3339)
96-
return newAnnotatedResource(resource, resolverType, cacheOperationRetrieve, timestamp), true
97-
}
99+
c.infow("Cache hit", "key", key)
98100

99-
func (c *resolverCache) infow(msg string, keysAndValues ...any) {
100-
if c.logger != nil {
101-
c.logger.Infow(msg, keysAndValues...)
101+
return c.annotate(cached, resolverType, cacheOperationRetrieve), nil
102102
}
103-
}
104103

105-
// Add stores a resource in the cache with the configured TTL and returns an
106-
// annotated version of the resource.
107-
func (c *resolverCache) Add(
108-
resolverType string,
109-
params []pipelinev1.Param,
110-
resource resolutionframework.ResolvedResource,
111-
) resolutionframework.ResolvedResource {
112-
key := generateCacheKey(resolverType, params)
113-
c.infow("Adding to cache", "key", key, "expiration", c.ttl)
104+
// If cache miss, resolve from remote using singleflight
105+
untyped, err, _ := c.flightGroup.Do(key, func() (any, error) {
106+
resolved, err := resolveFromRemote()
107+
if err != nil {
108+
return nil, err
109+
}
114110

115-
timestamp := c.clock.Now().Format(time.RFC3339)
116-
annotatedResource := newAnnotatedResource(resource, resolverType, cacheOperationStore, timestamp)
111+
annotated := c.annotate(resolved, resolverType, cacheOperationStore)
117112

118-
c.cache.Add(key, annotatedResource, c.ttl)
113+
// Store annotated resource with store operation and return annotated resource
114+
// to indicate it was stored in cache
115+
c.infow("Adding to cache", "key", key, "expiration", c.ttl)
116+
c.cache.Add(key, annotated, c.ttl)
117+
return annotated, nil
118+
})
119+
if err != nil {
120+
return nil, err
121+
}
119122

120-
return annotatedResource
123+
return untyped.(resolutionframework.ResolvedResource), nil
121124
}
122125

123-
// Remove deletes a cached resource identified by resolver type and parameters.
124-
func (c *resolverCache) Remove(resolverType string, params []pipelinev1.Param) {
125-
key := generateCacheKey(resolverType, params)
126-
c.infow("Removing from cache", "key", key)
126+
func (c *resolverCache) annotate(resolvedResource resolutionframework.ResolvedResource, resolverType, operation string) *annotatedResource {
127+
timestamp := c.clock.Now().Format(time.RFC3339)
128+
result := newAnnotatedResource(resolvedResource, resolverType, operation, timestamp)
129+
return result
130+
}
127131

128-
c.cache.Remove(key)
132+
func (c *resolverCache) infow(msg string, keysAndValues ...any) {
133+
if c.logger != nil {
134+
c.logger.Infow(msg, keysAndValues...)
135+
}
129136
}
130137

131138
// Clear removes all entries from the cache.
@@ -137,7 +144,9 @@ func (c *resolverCache) Clear() {
137144

138145
func generateCacheKey(resolverType string, params []pipelinev1.Param) string {
139146
// Create a deterministic string representation of the parameters
140-
paramStr := resolverType + ":"
147+
var sb strings.Builder
148+
sb.WriteString(resolverType)
149+
sb.WriteByte(':')
141150

142151
// Filter out the 'cache' parameter and sort remaining params by name for determinism
143152
filteredParams := make([]pipelinev1.Param, 0, len(params))
@@ -153,21 +162,22 @@ func generateCacheKey(resolverType string, params []pipelinev1.Param) string {
153162
})
154163

155164
for _, p := range filteredParams {
156-
paramStr += p.Name + "="
165+
sb.WriteString(p.Name)
166+
sb.WriteByte('=')
157167

158168
switch p.Value.Type {
159169
case pipelinev1.ParamTypeString:
160-
paramStr += p.Value.StringVal
170+
sb.WriteString(p.Value.StringVal)
161171
case pipelinev1.ParamTypeArray:
162172
// Sort array values for determinism
163173
arrayVals := make([]string, len(p.Value.ArrayVal))
164174
copy(arrayVals, p.Value.ArrayVal)
165175
sort.Strings(arrayVals)
166176
for i, val := range arrayVals {
167177
if i > 0 {
168-
paramStr += ","
178+
sb.WriteByte(',')
169179
}
170-
paramStr += val
180+
sb.WriteString(val)
171181
}
172182
case pipelinev1.ParamTypeObject:
173183
// Sort object keys for determinism
@@ -178,18 +188,20 @@ func generateCacheKey(resolverType string, params []pipelinev1.Param) string {
178188
sort.Strings(keys)
179189
for i, key := range keys {
180190
if i > 0 {
181-
paramStr += ","
191+
sb.WriteByte(',')
182192
}
183-
paramStr += key + ":" + p.Value.ObjectVal[key]
193+
sb.WriteString(key)
194+
sb.WriteByte(':')
195+
sb.WriteString(p.Value.ObjectVal[key])
184196
}
185197
default:
186198
// For unknown types, use StringVal as fallback
187-
paramStr += p.Value.StringVal
199+
sb.WriteString(p.Value.StringVal)
188200
}
189-
paramStr += ";"
201+
sb.WriteByte(';')
190202
}
191203

192204
// Generate a SHA-256 hash of the parameter string
193-
hash := sha256.Sum256([]byte(paramStr))
205+
hash := sha256.Sum256([]byte(sb.String()))
194206
return hex.EncodeToString(hash[:])
195207
}

0 commit comments

Comments
 (0)