Skip to content

Commit

Permalink
Mark posting group lazy if it has a lot of keys (#7961)
Browse files Browse the repository at this point in the history
* mark posting group lazy if it has a lot of add keys

Signed-off-by: Ben Ye <[email protected]>

* update docs

Signed-off-by: Ben Ye <[email protected]>

* rename labels

Signed-off-by: Ben Ye <[email protected]>

* changelog

Signed-off-by: Ben Ye <[email protected]>

* change to use max key series ratio

Signed-off-by: Ben Ye <[email protected]>

* update docs

Signed-off-by: Ben Ye <[email protected]>

* mention metrics

Signed-off-by: Ben Ye <[email protected]>

* update docs

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Dec 10, 2024
1 parent d0d93db commit 51c7dcd
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers.
- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons.

### Changed

Expand Down
73 changes: 39 additions & 34 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,40 +67,41 @@ const (
)

type storeConfig struct {
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
dataDir string
cacheIndexHeader bool
grpcConfig grpcConfig
httpConfig httpConfig
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockListStrategy string
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
selectorRelabelConf extflag.PathOrContent
advertiseCompatibilityLabel bool
consistencyDelay commonmodel.Duration
ignoreDeletionMarksDelay commonmodel.Duration
disableWeb bool
webConfig webConfig
label string
postingOffsetsInMemSampling int
cachingBucketConfig extflag.PathOrContent
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
dataDir string
cacheIndexHeader bool
grpcConfig grpcConfig
httpConfig httpConfig
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockListStrategy string
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
selectorRelabelConf extflag.PathOrContent
advertiseCompatibilityLabel bool
consistencyDelay commonmodel.Duration
ignoreDeletionMarksDelay commonmodel.Duration
disableWeb bool
webConfig webConfig
label string
postingOffsetsInMemSampling int
cachingBucketConfig extflag.PathOrContent
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
postingGroupMaxKeySeriesRatio float64

indexHeaderLazyDownloadStrategy string
}
Expand Down Expand Up @@ -204,6 +205,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.").
Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled)

cmd.Flag("store.posting-group-max-key-series-ratio", "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. thanos_bucket_store_lazy_expanded_posting_groups_total shows lazy expanded postings groups with reasons and you can tune this config accordingly. This config is only valid if lazy expanded posting is enabled. 0 disables the limit.").
Default("100").Float64Var(&sc.postingGroupMaxKeySeriesRatio)

cmd.Flag("store.index-header-lazy-download-strategy", "Strategy of how to download index headers lazily. Supported values: eager, lazy. If eager, always download index header during initial load. If lazy, download index header during query time.").
Default(string(indexheader.EagerDownloadStrategy)).
EnumVar(&sc.indexHeaderLazyDownloadStrategy, string(indexheader.EagerDownloadStrategy), string(indexheader.LazyDownloadStrategy))
Expand Down Expand Up @@ -429,6 +433,7 @@ func runStore(
return conf.estimatedMaxChunkSize
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
store.WithPostingGroupMaxKeySeriesRatio(conf.postingGroupMaxKeySeriesRatio),
store.WithIndexHeaderLazyDownloadStrategy(
indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(),
),
Expand Down
12 changes: 12 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@ Flags:
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--store.posting-group-max-key-series-ratio=100
Mark posting group as lazy if it fetches more
keys than R * max series the query should
fetch. With R set to 100, a posting group which
fetches 100K keys will be marked as lazy if
the current query only fetches 1000 series.
thanos_bucket_store_lazy_expanded_posting_groups_total
shows lazy expanded postings groups with
reasons and you can tune this config
accordingly. This config is only valid if lazy
expanded posting is enabled. 0 disables the
limit.
--sync-block-duration=15m Repeat interval for syncing the blocks between
local and remote view.
--tracing.config=<content>
Expand Down
61 changes: 49 additions & 12 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type bucketStoreMetrics struct {
emptyPostingCount *prometheus.CounterVec

lazyExpandedPostingsCount prometheus.Counter
lazyExpandedPostingGroupsByReason *prometheus.CounterVec
lazyExpandedPostingSizeBytes prometheus.Counter
lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter

Expand Down Expand Up @@ -345,6 +346,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Help: "Total number of times when lazy expanded posting optimization applies.",
})

m.lazyExpandedPostingGroupsByReason = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_lazy_expanded_posting_groups_total",
Help: "Total number of posting groups that are marked as lazy and corresponding reason",
}, []string{"reason"})

m.lazyExpandedPostingSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_lazy_expanded_posting_size_bytes_total",
Help: "Total number of lazy posting group size in bytes.",
Expand Down Expand Up @@ -419,7 +425,8 @@ type BucketStore struct {

enableChunkHashCalculation bool

enabledLazyExpandedPostings bool
enabledLazyExpandedPostings bool
postingGroupMaxKeySeriesRatio float64

sortingStrategy sortingStrategy

Expand Down Expand Up @@ -552,6 +559,13 @@ func WithLazyExpandedPostings(enabled bool) BucketStoreOption {
}
}

// WithPostingGroupMaxKeySeriesRatio configures a threshold to mark a posting group as lazy if it has more add keys.
func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) BucketStoreOption {
return func(s *BucketStore) {
s.postingGroupMaxKeySeriesRatio = postingGroupMaxKeySeriesRatio
}
}

// WithDontResort disables series resorting in Store Gateway.
func WithDontResort(true bool) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -1002,8 +1016,11 @@ type blockSeriesClient struct {
chunksLimiter ChunksLimiter
bytesLimiter BytesLimiter

lazyExpandedPostingEnabled bool
lazyExpandedPostingEnabled bool
// Mark posting group as lazy if it adds too many keys. 0 to disable.
postingGroupMaxKeySeriesRatio float64
lazyExpandedPostingsCount prometheus.Counter
lazyExpandedPostingGroupByReason *prometheus.CounterVec
lazyExpandedPostingSizeBytes prometheus.Counter
lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter

Expand Down Expand Up @@ -1046,7 +1063,9 @@ func newBlockSeriesClient(
chunkFetchDurationSum *prometheus.HistogramVec,
extLsetToRemove map[string]struct{},
lazyExpandedPostingEnabled bool,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingsCount prometheus.Counter,
lazyExpandedPostingByReason *prometheus.CounterVec,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter,
tenant string,
Expand Down Expand Up @@ -1081,7 +1100,9 @@ func newBlockSeriesClient(
chunkFetchDurationSum: chunkFetchDurationSum,

lazyExpandedPostingEnabled: lazyExpandedPostingEnabled,
postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio,
lazyExpandedPostingsCount: lazyExpandedPostingsCount,
lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason,
lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes,
lazyExpandedPostingSeriesOverfetchedSizeBytes: lazyExpandedPostingSeriesOverfetchedSizeBytes,

Expand Down Expand Up @@ -1133,7 +1154,7 @@ func (b *blockSeriesClient) ExpandPostings(
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant)
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
if err != nil {
return errors.Wrap(err, "expanded matching posting")
}
Expand Down Expand Up @@ -1566,7 +1587,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.metrics.chunkFetchDurationSum,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes,
tenant,
Expand Down Expand Up @@ -1880,7 +1903,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
nil,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes,
tenant,
Expand Down Expand Up @@ -2106,7 +2131,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
nil,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes,
tenant,
Expand Down Expand Up @@ -2563,7 +2590,16 @@ func (r *bucketIndexReader) reset(size int) {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) {
func (r *bucketIndexReader) ExpandedPostings(
ctx context.Context,
ms sortedMatchers,
bytesLimiter BytesLimiter,
lazyExpandedPostingEnabled bool,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
tenant string,
) (*lazyExpandedPostings, error) {
// Shortcut the case of `len(postingGroups) == 0`. It will only happen when no
// matchers specified, and we don't need to fetch expanded postings from cache.
if len(ms) == 0 {
Expand Down Expand Up @@ -2615,7 +2651,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch
postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil))
}

ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant)
ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
if err != nil {
return nil, errors.Wrap(err, "fetch and expand postings")
}
Expand Down Expand Up @@ -2661,13 +2697,14 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage
// If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels
// This computation happens in ExpandedPostings.
type postingGroup struct {
addAll bool
name string
matchers []*labels.Matcher
addKeys []string
removeKeys []string
cardinality int64
lazy bool
addAll bool
name string
matchers []*labels.Matcher
addKeys []string
removeKeys []string
cardinality int64
existentKeys int
lazy bool
}

func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup {
Expand Down
30 changes: 21 additions & 9 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,9 @@ func benchmarkExpandedPostings(
{`uniq=~"9|random-shuffled-values|1"`, []*labels.Matcher{iRegexBigValueSet}, bigValueSetSize},
}

dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"})
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
for _, c := range cases {
t.Run(c.name, func(t testutil.TB) {
b := &bucketBlock{
Expand All @@ -1304,7 +1306,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant)
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p.postings))
}
Expand Down Expand Up @@ -1340,8 +1342,10 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
// Match nothing.
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*")
ctx := context.Background()
dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant)
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, ps, (*lazyExpandedPostings)(nil))
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -1378,8 +1382,10 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) {
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "n", "1_.*")
matcher3 := labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")
ctx := context.Background()
dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant)
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
// We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers.
testutil.Equals(t, ps, emptyLazyPostings)
Expand Down Expand Up @@ -2872,7 +2878,9 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
wg := sync.WaitGroup{}
wg.Add(concurrency)

dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"})
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
for w := 0; w < concurrency; w++ {
go func() {
defer wg.Done()
Expand Down Expand Up @@ -2917,7 +2925,9 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
false,
0,
dummyCounter,
dummyCounterVec,
dummyCounter,
dummyCounter,
tenancy.DefaultTenant,
Expand Down Expand Up @@ -3551,7 +3561,9 @@ func TestExpandedPostingsRace(t *testing.T) {

l := sync.Mutex{}
previousRefs := make(map[int][]storage.SeriesRef)
dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"})
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})

for {
if tm.Err() != nil {
Expand All @@ -3573,7 +3585,7 @@ func TestExpandedPostingsRace(t *testing.T) {
wg.Add(1)

go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant)
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
defer wg.Done()

Expand Down
Loading

0 comments on commit 51c7dcd

Please sign in to comment.