Skip to content

Commit

Permalink
Simplify store interface
Browse files Browse the repository at this point in the history
Use a sinf `Set` method instead of `Add` and `Update`.

Signed-off-by: Soule BA <[email protected]>
  • Loading branch information
souleb committed Jun 10, 2024
1 parent 82564ce commit 6b7b355
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 107 deletions.
39 changes: 2 additions & 37 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,44 +129,9 @@ func (c *Cache[T]) Close() error {
return nil
}

// Add an item to the cache, existing index will not be overwritten.
// To overwrite existing index, use Update.
// Set an item in the cache, existing index will be overwritten.
// If the cache is full, Add will return an error.
func (c *Cache[T]) Add(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return KeyError{object, err}
}

c.mu.Lock()
if c.closed {
c.mu.Unlock()
recordRequest(c.metrics, StatusFailure)
return KeyError{object, ErrClosed}
}
_, found := c.index[key]
if found {
c.mu.Unlock()
recordRequest(c.metrics, StatusFailure)
return KeyError{object, ErrAlreadyExists}
}

if c.capacity > 0 && len(c.index) < c.capacity {
c.set(key, object)
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
recordItemIncrement(c.metrics)
return nil
}
c.mu.Unlock()
recordRequest(c.metrics, StatusFailure)
return KeyError{object, ErrFull}
}

// Update adds an item to the cache, replacing any existing item.
// If the cache is full, it will return an error.
func (c *Cache[T]) Update(object T) error {
func (c *Cache[T]) Set(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
Expand Down
38 changes: 18 additions & 20 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestCache(t *testing.T) {
g.Expect(found).To(BeFalse())

// Add an item to the cache
err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())

// Get the item from the cache
Expand All @@ -77,7 +77,7 @@ func TestCache(t *testing.T) {
},
}
// Add another item to the cache
err = cache.Add(obj2)
err = cache.Set(obj2)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns/test", "test-ns/test2"))

Expand All @@ -98,12 +98,12 @@ func TestCache(t *testing.T) {
Name: "test3",
},
}
err = cache.Add(obj3)
err = cache.Set(obj3)
g.Expect(err).ToNot(HaveOccurred())

// Replace an item in the cache
obj3.Labels = map[string]string{"pp.kubernetes.io/created-by: ": "flux"}
err = cache.Update(obj3)
err = cache.Set(obj3)
g.Expect(err).ToNot(HaveOccurred())

// Get the item from the cache
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestCache(t *testing.T) {
},
}

err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())

// set expiration time to 2 seconds
Expand Down Expand Up @@ -190,16 +190,15 @@ func Test_Cache_Add(t *testing.T) {
},
Object: "test-token",
}
err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
err = cache.SetExpiration(obj, 10*time.Millisecond)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject"))

// try adding the same object again
err = cache.Add(obj)
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring("already exists"))
// try adding the same object again, it should overwrite the existing one
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())

// wait for the item to expire
time.Sleep(20 * time.Millisecond)
Expand All @@ -209,7 +208,7 @@ func Test_Cache_Add(t *testing.T) {

// add another object
obj.Name = "test2"
err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test2_test-group_TestObject"))

Expand All @@ -220,8 +219,7 @@ func Test_Cache_Add(t *testing.T) {
gotk_cache_evictions_total 1
# HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure.
# TYPE gotk_cache_requests_total counter
gotk_cache_requests_total{status="failure"} 1
gotk_cache_requests_total{status="success"} 6
gotk_cache_requests_total{status="success"} 7
# HELP gotk_cached_items Total number of items in the cache.
# TYPE gotk_cached_items gauge
gotk_cached_items 1
Expand All @@ -247,12 +245,12 @@ func Test_Cache_Update(t *testing.T) {
},
Object: "test-token",
}
err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject"))

obj.Object = "test-token2"
err = cache.Update(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject"))
g.Expect(cache.index["test-ns_test_test-group_TestObject"].object.Object).To(Equal("test-token2"))
Expand Down Expand Up @@ -296,7 +294,7 @@ func Test_Cache_Get(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
g.Expect(found).To(BeFalse())

err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())

item, found, err := cache.Get(obj)
Expand Down Expand Up @@ -342,7 +340,7 @@ func Test_Cache_Delete(t *testing.T) {
Object: "test-token",
}

err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject"))

Expand Down Expand Up @@ -459,7 +457,7 @@ func Test_Cache_deleteExpired(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())

for _, item := range tt.items {
err := cache.Add(item.object)
err := cache.Set(item.object)
g.Expect(err).ToNot(HaveOccurred())
if item.expire {
err = cache.SetExpiration(item.object, item.expiration)
Expand Down Expand Up @@ -495,7 +493,7 @@ func Test_Cache_Resize(t *testing.T) {
},
Object: "test-token",
}
err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
err = cache.SetExpiration(obj, 10*time.Minute)
g.Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -529,7 +527,7 @@ func TestCache_Concurrent(t *testing.T) {
wg.Add(2)
go func() {
defer wg.Done()
_ = cache.Add(objmap[key])
_ = cache.Set(objmap[key])
}()
go func() {
defer wg.Done()
Expand Down
40 changes: 9 additions & 31 deletions cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,25 @@ func NewLRU[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*LRU[T
return lru, nil
}

// Add adds a node to the end of the list
// if the node is already in the cache, an error is returned
func (c *LRU[T]) Add(object T) error {
// Set an item in the cache, existing index will be overwritten.
func (c *LRU[T]) Set(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return KeyError{object, err}
}

// if node is already in cache, return error
c.mu.RLock()
_, ok := c.cache[key]
c.mu.RUnlock()
c.mu.Lock()
node, ok := c.cache[key]
if ok {
recordRequest(c.metrics, StatusFailure)
return KeyError{object, ErrAlreadyExists}
c.delete(node)
_ = c.add(&Node[T]{key: key, object: object})
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
return nil
}

c.mu.Lock()
evicted := c.add(&Node[T]{key: key, object: object})
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
Expand Down Expand Up @@ -228,28 +228,6 @@ func (c *LRU[T]) get(key string) (item T, exists bool, err error) {
return node.object, true, nil
}

// Update updates an object in the cache.
// If the object is not in the cache, it is added.
func (c *LRU[T]) Update(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return KeyError{object, err}
}

c.mu.Lock()
node, ok := c.cache[key]
if ok {
c.delete(node)
_ = c.add(&Node[T]{key: key, object: object})
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
return nil
}
c.mu.Unlock()
return c.Add(object)
}

// ListKeys returns a list of keys in the cache.
func (c *LRU[T]) ListKeys() []string {
keys := make([]string, 0, len(c.cache))
Expand Down
28 changes: 13 additions & 15 deletions cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func Test_LRU(t *testing.T) {
WithMetricsRegisterer[any](prometheus.NewPedanticRegistry()))
g.Expect(err).ToNot(HaveOccurred())
for _, input := range v.inputs {
err := cache.Add(input)
err := cache.Set(input)
g.Expect(err).ToNot(HaveOccurred())
}

Expand Down Expand Up @@ -256,18 +256,17 @@ func Test_LRU_Add(t *testing.T) {
},
Object: "test-token",
}
err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject"))

// try adding the same object again
err = cache.Add(obj)
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring("already exists"))
// try adding the same object again, it should overwrite the existing one
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())

// add another object
obj.Name = "test2"
err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test2_test-group_TestObject"))

Expand All @@ -278,8 +277,7 @@ func Test_LRU_Add(t *testing.T) {
gotk_cache_evictions_total 1
# HELP gotk_cache_requests_total Total number of cache requests partioned by success or failure.
# TYPE gotk_cache_requests_total counter
gotk_cache_requests_total{status="failure"}1
gotk_cache_requests_total{status="success"} 4
gotk_cache_requests_total{status="success"} 5
# HELP gotk_cached_items Total number of items in the cache.
# TYPE gotk_cached_items gauge
gotk_cached_items 1
Expand All @@ -305,12 +303,12 @@ func Test_LRU_Update(t *testing.T) {
},
Object: "test-token",
}
err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject"))

obj.Object = "test-token2"
err = cache.Update(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cache.ListKeys()).To(ConsistOf("test-ns_test_test-group_TestObject"))
g.Expect(cache.cache["test-ns_test_test-group_TestObject"].object.Object).To(Equal("test-token2"))
Expand Down Expand Up @@ -354,7 +352,7 @@ func Test_LRU_Get(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
g.Expect(found).To(BeFalse())

err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())

item, found, err := cache.Get(obj)
Expand Down Expand Up @@ -399,7 +397,7 @@ func Test_LRU_Delete(t *testing.T) {
Object: "test-token",
}

err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())

err = cache.Delete(obj)
Expand Down Expand Up @@ -439,7 +437,7 @@ func Test_LRU_Resize(t *testing.T) {
},
Object: "test-token",
}
err = cache.Add(obj)
err = cache.Set(obj)
g.Expect(err).ToNot(HaveOccurred())
}

Expand Down Expand Up @@ -470,7 +468,7 @@ func TestLRU_Concurrent(t *testing.T) {
wg.Add(2)
go func() {
defer wg.Done()
_ = cache.Add(objmap[key])
_ = cache.Set(objmap[key])
}()
go func() {
defer wg.Done()
Expand Down
7 changes: 3 additions & 4 deletions cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ import (
// It is a generic version of the Kubernetes client-go cache.Store interface.
// See https://pkg.go.dev/k8s.io/client-go/tools/cache#Store
type Store[T any] interface {
// Add adds an object to the store.
Add(object T) error
// Update updates an object in the store.
Update(object T) error
// Set adds an object to the store.
// It will overwrite the item if it already exists.
Set(object T) error
// Delete deletes an object from the store.
Delete(object T) error
// ListKeys returns a list of keys in the store.
Expand Down

0 comments on commit 6b7b355

Please sign in to comment.