Skip to content

Commit d944582

Browse files
authored
Purge expired postings cache items due inactivity (#6502)
* Purge expired postings cache items due inactivity Signed-off-by: alanprot <[email protected]> * Fix comments Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent f2361b2 commit d944582

File tree

4 files changed

+76
-3
lines changed

4 files changed

+76
-3
lines changed

pkg/ingester/ingester.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -878,6 +878,14 @@ func (i *Ingester) starting(ctx context.Context) error {
878878
servs = append(servs, closeIdleService)
879879
}
880880

881+
if i.expandedPostingsCacheFactory != nil {
882+
interval := i.cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval
883+
if interval == 0 {
884+
interval = cortex_tsdb.ExpandedCachingExpireInterval
885+
}
886+
servs = append(servs, services.NewTimerService(interval, nil, i.expirePostingsCache, nil))
887+
}
888+
881889
var err error
882890
i.TSDBState.subservices, err = services.NewManager(servs...)
883891
if err == nil {
@@ -2794,6 +2802,18 @@ func (i *Ingester) closeAndDeleteIdleUserTSDBs(ctx context.Context) error {
27942802
return nil
27952803
}
27962804

2805+
func (i *Ingester) expirePostingsCache(ctx context.Context) error {
2806+
for _, userID := range i.getTSDBUsers() {
2807+
if ctx.Err() != nil {
2808+
return nil
2809+
}
2810+
userDB := i.getTSDB(userID)
2811+
userDB.postingCache.PurgeExpiredItems()
2812+
}
2813+
2814+
return nil
2815+
}
2816+
27972817
func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckResult {
27982818
userDB := i.getTSDB(userID)
27992819
if userDB == nil || userDB.shipper == nil {

pkg/ingester/ingester_test.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5525,6 +5525,7 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {
55255525

55265526
func TestExpendedPostingsCache(t *testing.T) {
55275527
cfg := defaultIngesterTestConfig(t)
5528+
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
55285529
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
55295530

55305531
runQuery := func(t *testing.T, ctx context.Context, i *Ingester, matchers []*client.LabelMatcher) []client.TimeSeriesChunk {
@@ -5540,9 +5541,10 @@ func TestExpendedPostingsCache(t *testing.T) {
55405541
}
55415542

55425543
tc := map[string]struct {
5543-
cacheConfig cortex_tsdb.TSDBPostingsCacheConfig
5544-
expectedBlockPostingCall int
5545-
expectedHeadPostingCall int
5544+
cacheConfig cortex_tsdb.TSDBPostingsCacheConfig
5545+
expectedBlockPostingCall int
5546+
expectedHeadPostingCall int
5547+
shouldExpireDueInactivity bool
55465548
}{
55475549
"cacheDisabled": {
55485550
expectedBlockPostingCall: 0,
@@ -5594,6 +5596,23 @@ func TestExpendedPostingsCache(t *testing.T) {
55945596
},
55955597
},
55965598
},
5599+
"expire due inactivity": {
5600+
expectedBlockPostingCall: 1,
5601+
expectedHeadPostingCall: 1,
5602+
shouldExpireDueInactivity: true,
5603+
cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{
5604+
Blocks: cortex_tsdb.PostingsCacheConfig{
5605+
Ttl: time.Second,
5606+
MaxBytes: 1024 * 1024 * 1024,
5607+
Enabled: true,
5608+
},
5609+
Head: cortex_tsdb.PostingsCacheConfig{
5610+
Ttl: time.Second,
5611+
MaxBytes: 1024 * 1024 * 1024,
5612+
Enabled: true,
5613+
},
5614+
},
5615+
},
55975616
}
55985617

55995618
for name, c := range tc {
@@ -5790,6 +5809,17 @@ func TestExpendedPostingsCache(t *testing.T) {
57905809
require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: "extra", Value: "1"}}), 1)
57915810
// Return cached value from block and bypass head
57925811
require.Equal(t, int64(0), postingsForMatchersCalls.Load())
5812+
5813+
if c.shouldExpireDueInactivity {
5814+
test.Poll(t, c.cacheConfig.Blocks.Ttl+c.cacheConfig.Head.Ttl+cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval, 0, func() interface{} {
5815+
size := 0
5816+
for _, userID := range i.getTSDBUsers() {
5817+
userDB := i.getTSDB(userID)
5818+
size += userDB.postingCache.Size()
5819+
}
5820+
return size
5821+
})
5822+
}
57935823
})
57945824
}
57955825
}

pkg/storage/tsdb/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ const (
3030
// How often are open TSDBs checked for being idle and closed.
3131
DefaultCloseIdleTSDBInterval = 5 * time.Minute
3232

33+
// How often expired items are cleaned from the PostingsCache
34+
ExpandedCachingExpireInterval = 5 * time.Minute
35+
3336
// How often to check for tenant deletion mark.
3437
DeletionMarkCheckInterval = 1 * time.Hour
3538

@@ -156,6 +159,9 @@ type TSDBConfig struct {
156159
// How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override.
157160
CloseIdleTSDBInterval time.Duration `yaml:"-"`
158161

162+
// How often expired items are cleaned from the PostingsCache. ExpandedCachingExpireInterval is not suitable for testing, so tests can override.
163+
ExpandedCachingExpireInterval time.Duration `yaml:"-"`
164+
159165
// Positive value enables experimental support for exemplars. 0 or less to disable.
160166
MaxExemplars int `yaml:"max_exemplars"`
161167

pkg/storage/tsdb/expanded_postings_cache.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, m
124124
type ExpandedPostingsCache interface {
125125
PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
126126
ExpireSeries(metric labels.Labels)
127+
PurgeExpiredItems()
128+
Size() int
127129
}
128130

129131
type blocksPostingsForMatchersCache struct {
@@ -166,6 +168,15 @@ func (c *blocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {
166168
c.seedByHash.incrementSeed(c.userId, metricName)
167169
}
168170

171+
func (c *blocksPostingsForMatchersCache) PurgeExpiredItems() {
172+
c.headCache.expire()
173+
c.blocksCache.expire()
174+
}
175+
176+
func (c *blocksPostingsForMatchersCache) Size() int {
177+
return c.headCache.size() + c.blocksCache.size()
178+
}
179+
169180
func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
170181
return c.fetchPostings(blockID, ix, ms...)(ctx)
171182
}
@@ -365,6 +376,12 @@ func (c *fifoCache[V]) expire() {
365376
}
366377
}
367378

379+
func (c *fifoCache[V]) size() int {
380+
c.cachedMtx.RLock()
381+
defer c.cachedMtx.RUnlock()
382+
return c.cached.Len()
383+
}
384+
368385
func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
369386
r := &cacheEntryPromise[V]{
370387
done: make(chan struct{}),

0 commit comments

Comments
 (0)