From e493ae81a789b04e32e00661e1d7dcf888a6bb5f Mon Sep 17 00:00:00 2001 From: alanprot Date: Wed, 20 Nov 2024 21:43:19 -0800 Subject: [PATCH 1/2] Changing Postings Cache from Fifo to LRU Signed-off-by: alanprot --- pkg/storage/tsdb/expanded_postings_cache.go | 43 ++++++---- .../tsdb/expanded_postings_cache_test.go | 79 +++++++++++++++++-- 2 files changed, 100 insertions(+), 22 deletions(-) diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 59af6d879f9..4ca2ec6787c 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -97,8 +97,8 @@ type ExpandedPostingsCache interface { type BlocksPostingsForMatchersCache struct { strippedLock []sync.RWMutex - headCache *fifoCache[[]storage.SeriesRef] - blocksCache *fifoCache[[]storage.SeriesRef] + headCache *lruCache[[]storage.SeriesRef] + blocksCache *lruCache[[]storage.SeriesRef] headSeedByMetricName []int postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) @@ -117,8 +117,8 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp } return &BlocksPostingsForMatchersCache{ - headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), - blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), + headCache: newLruCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), + blocksCache: newLruCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), headSeedByMetricName: make([]int, seedArraySize), strippedLock: make([]sync.RWMutex, numOfSeedsStripes), postingsForMatchersFunc: cfg.PostingsForMatchers, @@ -272,7 +272,7 @@ func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) { return "", false } -type fifoCache[V any] struct { +type lruCache[V any] struct { cfg PostingsCacheConfig cachedValues *sync.Map timeNow func() time.Time @@ -285,8 +285,8 @@ type fifoCache[V any] struct { cachedBytes int64 } -func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] { - return &fifoCache[V]{ +func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] { + return &lruCache[V]{ cachedValues: new(sync.Map), cached: list.New(), cfg: cfg, @@ -296,7 +296,7 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded } } -func (c *fifoCache[V]) expire() { +func (c *lruCache[V]) expire() { if c.cfg.Ttl <= 0 { return } @@ -314,7 +314,7 @@ func (c *fifoCache[V]) expire() { } } -func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { +func (c *lruCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { r := &cacheEntryPromise[V]{ done: make(chan struct{}), } @@ -331,13 +331,14 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) r.v, r.sizeBytes, r.err = fetch() r.sizeBytes += int64(len(k)) r.ts = c.timeNow() - c.created(k, r.sizeBytes) + r.lElement = c.created(k, r.sizeBytes) c.expire() } if ok { // If the promise is already in the cache, lets wait it to fetch the data. <-loaded.(*cacheEntryPromise[V]).done + c.moveBack(loaded.(*cacheEntryPromise[V]).lElement) // If is cached but is expired, lets try to replace the cache value. if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) { @@ -354,12 +355,12 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) return loaded.(*cacheEntryPromise[V]), ok } -func (c *fifoCache[V]) contains(k string) bool { +func (c *lruCache[V]) contains(k string) bool { _, ok := c.cachedValues.Load(k) return ok } -func (c *fifoCache[V]) shouldEvictHead() (string, bool) { +func (c *lruCache[V]) shouldEvictHead() (string, bool) { h := c.cached.Front() if h == nil { return "", false @@ -380,7 +381,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) { return "", false } -func (c *fifoCache[V]) evictHead() { +func (c *lruCache[V]) evictHead() { front := c.cached.Front() c.cached.Remove(front) oldestKey := front.Value.(string) @@ -389,18 +390,24 @@ func (c *fifoCache[V]) evictHead() { } } -func (c *fifoCache[V]) created(key string, sizeBytes int64) { +func (c *lruCache[V]) created(key string, sizeBytes int64) *list.Element { if c.cfg.Ttl <= 0 { c.cachedValues.Delete(key) - return + return nil } c.cachedMtx.Lock() defer c.cachedMtx.Unlock() - c.cached.PushBack(key) c.cachedBytes += sizeBytes + return c.cached.PushBack(key) +} + +func (c *lruCache[V]) moveBack(ele *list.Element) { + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cached.MoveToBack(ele) } -func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { +func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) { if oldSize == newSizeBytes { return } @@ -417,6 +424,8 @@ type cacheEntryPromise[V any] struct { done chan struct{} v V err error + + lElement *list.Element } func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool { diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index db821736a32..0480b1e3c2c 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -3,6 +3,7 @@ package tsdb import ( "bytes" "fmt" + "math/rand" "strings" "sync" "testing" @@ -21,7 +22,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { MaxBytes: 10 << 20, } m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) - cache := newFifoCache[int](cfg, "test", m, time.Now) + cache := newLruCache[int](cfg, "test", m, time.Now) calls := atomic.Int64{} concurrency := 100 wg := sync.WaitGroup{} @@ -45,12 +46,12 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { } -func TestFifoCacheDisabled(t *testing.T) { +func TestCacheDisabled(t *testing.T) { cfg := PostingsCacheConfig{} cfg.Enabled = false m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) timeNow := time.Now - cache := newFifoCache[int](cfg, "test", m, timeNow) + cache := newLruCache[int](cfg, "test", m, timeNow) old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) { return 1, 0, nil }) @@ -59,8 +60,66 @@ func TestFifoCacheDisabled(t *testing.T) { require.False(t, cache.contains("key1")) } -func TestFifoCacheExpire(t *testing.T) { +func TestLru(t *testing.T) { + maxNumberOfCachedItems := 5 + keySize := 20 + + cfg := PostingsCacheConfig{ + Enabled: true, + MaxBytes: int64(maxNumberOfCachedItems + keySize*maxNumberOfCachedItems), // for this test each element has size of 1, to it will be 'maxNumberOfCachedItems' elements + Ttl: time.Hour, + } + r := prometheus.NewPedanticRegistry() + m := NewPostingCacheMetrics(r) + cache := newLruCache[int](cfg, "test", m, time.Now) + + for i := 0; i < maxNumberOfCachedItems; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.False(t, hit) + require.Equal(t, key, cache.cached.Back().Value) + assertCacheItemsCount(t, cache, i+1) + } + + for i := 0; i < maxNumberOfCachedItems; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.True(t, hit) + require.Equal(t, key, cache.cached.Back().Value) + assertCacheItemsCount(t, cache, maxNumberOfCachedItems) + } + + // Lets now hit 2 random keys and make sure they will be the last to be expired + recentUsedKeys := make(map[string]struct{}) + for i := 0; i < 2; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", rand.Int()%maxNumberOfCachedItems), keySize) + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.True(t, hit) + assertCacheItemsCount(t, cache, maxNumberOfCachedItems) + recentUsedKeys[key] = struct{}{} + } + + // Create new keys and make sure the recentUsedKeys are still in the cache + for i := 0; i < maxNumberOfCachedItems-2; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key_new%d", i), keySize) + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.False(t, hit) + require.Equal(t, maxNumberOfCachedItems, cache.cached.Len()) + } + + for i := 0; i < maxNumberOfCachedItems; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + if _, ok := recentUsedKeys[key]; ok { + _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) + require.True(t, hit) + } else { + require.False(t, cache.contains(key)) + } + } +} + +func TestCacheExpire(t *testing.T) { keySize := 20 numberOfKeys := 100 @@ -93,7 +152,7 @@ func TestFifoCacheExpire(t *testing.T) { r := prometheus.NewPedanticRegistry() m := NewPostingCacheMetrics(r) timeNow := time.Now - cache := newFifoCache[int](c.cfg, "test", m, timeNow) + cache := newLruCache[int](c.cfg, "test", m, timeNow) for i := 0; i < numberOfKeys; i++ { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) @@ -183,3 +242,13 @@ func RepeatStringIfNeeded(seed string, length int) string { return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))] } + +func assertCacheItemsCount[T any](t *testing.T, cache *lruCache[T], size int) { + require.Equal(t, size, cache.cached.Len()) + count := 0 + cache.cachedValues.Range(func(k, v any) bool { + count++ + return true + }) + require.Equal(t, size, count) +} From 602f72e4278de07e30bfffb7b73224f216767a75 Mon Sep 17 00:00:00 2001 From: alanprot Date: Wed, 20 Nov 2024 21:48:38 -0800 Subject: [PATCH 2/2] rename lru list Signed-off-by: alanprot --- pkg/storage/tsdb/expanded_postings_cache.go | 20 +++++++++++-------- .../tsdb/expanded_postings_cache_test.go | 8 ++++---- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 4ca2ec6787c..9de72ee5d7a 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -280,15 +280,17 @@ type lruCache[V any] struct { metrics ExpandedPostingsCacheMetrics // Fields from here should be locked - cachedMtx sync.RWMutex - cached *list.List + cachedMtx sync.RWMutex + // Keeps tracks of the last recent used keys. + // The most recent key used is placed in the back of the list while items should be evicted from the front of the list + lruList *list.List cachedBytes int64 } func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] { return &lruCache[V]{ cachedValues: new(sync.Map), - cached: list.New(), + lruList: list.New(), cfg: cfg, timeNow: timeNow, name: name, @@ -361,7 +363,7 @@ func (c *lruCache[V]) contains(k string) bool { } func (c *lruCache[V]) shouldEvictHead() (string, bool) { - h := c.cached.Front() + h := c.lruList.Front() if h == nil { return "", false } @@ -382,8 +384,8 @@ func (c *lruCache[V]) shouldEvictHead() (string, bool) { } func (c *lruCache[V]) evictHead() { - front := c.cached.Front() - c.cached.Remove(front) + front := c.lruList.Front() + c.lruList.Remove(front) oldestKey := front.Value.(string) if oldest, loaded := c.cachedValues.LoadAndDelete(oldestKey); loaded { c.cachedBytes -= oldest.(*cacheEntryPromise[V]).sizeBytes @@ -398,13 +400,13 @@ func (c *lruCache[V]) created(key string, sizeBytes int64) *list.Element { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() c.cachedBytes += sizeBytes - return c.cached.PushBack(key) + return c.lruList.PushBack(key) } func (c *lruCache[V]) moveBack(ele *list.Element) { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() - c.cached.MoveToBack(ele) + c.lruList.MoveToBack(ele) } func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) { @@ -425,6 +427,8 @@ type cacheEntryPromise[V any] struct { v V err error + // reference for the element in the LRU list + // This is used to push this cache entry to the back of the list as result as a cache hit lElement *list.Element } diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index 0480b1e3c2c..a614bafcaf9 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -77,7 +77,7 @@ func TestLru(t *testing.T) { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) require.False(t, hit) - require.Equal(t, key, cache.cached.Back().Value) + require.Equal(t, key, cache.lruList.Back().Value) assertCacheItemsCount(t, cache, i+1) } @@ -85,7 +85,7 @@ func TestLru(t *testing.T) { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) require.True(t, hit) - require.Equal(t, key, cache.cached.Back().Value) + require.Equal(t, key, cache.lruList.Back().Value) assertCacheItemsCount(t, cache, maxNumberOfCachedItems) } @@ -104,7 +104,7 @@ func TestLru(t *testing.T) { key := RepeatStringIfNeeded(fmt.Sprintf("key_new%d", i), keySize) _, hit := cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 1, nil }) require.False(t, hit) - require.Equal(t, maxNumberOfCachedItems, cache.cached.Len()) + require.Equal(t, maxNumberOfCachedItems, cache.lruList.Len()) } for i := 0; i < maxNumberOfCachedItems; i++ { @@ -244,7 +244,7 @@ func RepeatStringIfNeeded(seed string, length int) string { } func assertCacheItemsCount[T any](t *testing.T, cache *lruCache[T], size int) { - require.Equal(t, size, cache.cached.Len()) + require.Equal(t, size, cache.lruList.Len()) count := 0 cache.cachedValues.Range(func(k, v any) bool { count++