Skip to content

Commit ebf631d

Browse files
authored
Apply low selectivity matchers lazily in ingester (#7063)
* apply low selectivity matchers lazily in ingester Signed-off-by: yeya24 <[email protected]> * changelog Signed-off-by: Ben Ye <[email protected]> * lint Signed-off-by: yeya24 <[email protected]> * add benchmark Signed-off-by: yeya24 <[email protected]> * lint Signed-off-by: yeya24 <[email protected]> * update changelog Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]> Signed-off-by: Ben Ye <[email protected]>
1 parent c505924 commit ebf631d

File tree

7 files changed

+533
-3
lines changed

7 files changed

+533
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
## master / unreleased
44

5+
* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
56
* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082
67

7-
88
## 1.20.0 in progress
99

1010
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3738,6 +3738,12 @@ instance_limits:
37383738
# CLI flag: -ingester.skip-metadata-limits
37393739
[skip_metadata_limits: <boolean> | default = true]
37403740
3741+
# Enable optimization of label matchers when query chunks. When enabled,
3742+
# matchers with low selectivity such as =~.+ are applied lazily during series
3743+
# scanning instead of being used for postings matching.
3744+
# CLI flag: -ingester.enable-matcher-optimization
3745+
[enable_matcher_optimization: <boolean> | default = false]
3746+
37413747
query_protection:
37423748
rejection:
37433749
threshold:

pkg/ingester/ingester.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ type Config struct {
162162
// If enabled, the metadata API returns all metadata regardless of the limits.
163163
SkipMetadataLimits bool `yaml:"skip_metadata_limits"`
164164

165+
// When enabled, matchers with low selectivity are applied lazily during series scanning
166+
// instead of being used for postings selection.
167+
EnableMatcherOptimization bool `yaml:"enable_matcher_optimization"`
168+
165169
QueryProtection configs.QueryProtection `yaml:"query_protection"`
166170
}
167171

@@ -185,6 +189,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
185189
f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.")
186190
f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
187191
f.BoolVar(&cfg.SkipMetadataLimits, "ingester.skip-metadata-limits", true, "If enabled, the metadata API returns all metadata regardless of the limits.")
192+
f.BoolVar(&cfg.EnableMatcherOptimization, "ingester.enable-matcher-optimization", false, "Enable optimization of label matchers when query chunks. When enabled, matchers with low selectivity such as =~.+ are applied lazily during series scanning instead of being used for postings matching.")
188193

189194
cfg.DefaultLimits.RegisterFlagsWithPrefix(f, "ingester.")
190195
cfg.QueryProtection.RegisterFlagsWithPrefix(f, "ingester.")
@@ -2295,6 +2300,10 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
22952300
End: through,
22962301
DisableTrimming: i.cfg.DisableChunkTrimming,
22972302
}
2303+
var lazyMatchers []*labels.Matcher
2304+
if i.cfg.EnableMatcherOptimization {
2305+
matchers, lazyMatchers = optimizeMatchers(matchers)
2306+
}
22982307
// It's not required to return sorted series because series are sorted by the Cortex querier.
22992308
ss := q.Select(ctx, false, hints, matchers...)
23002309
c()
@@ -2308,14 +2317,19 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
23082317
var it chunks.Iterator
23092318
for ss.Next() {
23102319
series := ss.At()
2320+
lbls := series.Labels()
2321+
2322+
if !labelsMatches(lbls, lazyMatchers) {
2323+
continue
2324+
}
23112325

2312-
if sm.IsSharded() && !sm.MatchesLabels(series.Labels()) {
2326+
if sm.IsSharded() && !sm.MatchesLabels(lbls) {
23132327
continue
23142328
}
23152329

23162330
// convert labels to LabelAdapter
23172331
ts := client.TimeSeriesChunk{
2318-
Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()),
2332+
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
23192333
}
23202334

23212335
it := series.Iterator(it)

pkg/ingester/ingester_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/thanos-io/thanos/pkg/runutil"
4343
"github.com/thanos-io/thanos/pkg/shipper"
4444
storecache "github.com/thanos-io/thanos/pkg/store/cache"
45+
"github.com/thanos-io/thanos/pkg/store/storepb"
4546
"github.com/weaveworks/common/httpgrpc"
4647
"github.com/weaveworks/common/middleware"
4748
"github.com/weaveworks/common/user"
@@ -3941,6 +3942,134 @@ func BenchmarkIngester_QueryStream_Chunks(b *testing.B) {
39413942
}
39423943
}
39433944

3945+
func BenchmarkIngester_QueryStreamChunks_MatcherOptimization(b *testing.B) {
3946+
tests := map[string]struct {
3947+
matchers []*labels.Matcher
3948+
description string
3949+
}{
3950+
"metric name with regex matchers": {
3951+
matchers: []*labels.Matcher{
3952+
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_metric"),
3953+
labels.MustNewMatcher(labels.MatchRegexp, "region", ".+"),
3954+
labels.MustNewMatcher(labels.MatchRegexp, "job", ".+"),
3955+
},
3956+
description: "Metric name with .+ regex matchers",
3957+
},
3958+
"metric name with not equal empty": {
3959+
matchers: []*labels.Matcher{
3960+
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_metric"),
3961+
labels.MustNewMatcher(labels.MatchNotEqual, "env", ""),
3962+
labels.MustNewMatcher(labels.MatchNotEqual, "pod", ""),
3963+
},
3964+
description: "Metric name with != \"\" matchers",
3965+
},
3966+
"metric name with sparse label": {
3967+
matchers: []*labels.Matcher{
3968+
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_metric"),
3969+
labels.MustNewMatcher(labels.MatchRegexp, "sparse_label", ".+"),
3970+
},
3971+
description: "Metric name with sparse label matcher",
3972+
},
3973+
"complex matchers": {
3974+
matchers: []*labels.Matcher{
3975+
labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_metric"),
3976+
labels.MustNewMatcher(labels.MatchRegexp, "region", ".+"),
3977+
labels.MustNewMatcher(labels.MatchRegexp, "job", ".+"),
3978+
labels.MustNewMatcher(labels.MatchRegexp, "env", ".+"),
3979+
labels.MustNewMatcher(labels.MatchRegexp, "pod", ".+"),
3980+
},
3981+
description: "Complex matchers with .+ regex",
3982+
},
3983+
}
3984+
3985+
for testName, testData := range tests {
3986+
b.Run(testName+"_optimization_disabled", func(b *testing.B) {
3987+
benchmarkQueryStreamChunksWithMatcherOptimization(b, false, testData.matchers, testData.description+" without optimization")
3988+
})
3989+
b.Run(testName+"_optimization_enabled", func(b *testing.B) {
3990+
benchmarkQueryStreamChunksWithMatcherOptimization(b, true, testData.matchers, testData.description+" with optimization")
3991+
})
3992+
}
3993+
}
3994+
3995+
func benchmarkQueryStreamChunksWithMatcherOptimization(b *testing.B, enableMatcherOptimization bool, matchers []*labels.Matcher, description string) {
3996+
cfg := defaultIngesterTestConfig(b)
3997+
cfg.EnableMatcherOptimization = enableMatcherOptimization
3998+
3999+
i, err := prepareIngesterWithBlocksStorage(b, cfg, prometheus.NewRegistry())
4000+
require.NoError(b, err)
4001+
require.NoError(b, services.StartAndAwaitRunning(context.Background(), i))
4002+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
4003+
4004+
// Wait until it's ACTIVE
4005+
test.Poll(b, 1*time.Second, ring.ACTIVE, func() any {
4006+
return i.lifecycler.GetState()
4007+
})
4008+
4009+
ctx := user.InjectOrgID(context.Background(), userID)
4010+
4011+
for s := range 1000 {
4012+
// Create base labels
4013+
labelPairs := []string{
4014+
labels.MetricName, "test_metric",
4015+
"region", fmt.Sprintf("region-%d", s%10),
4016+
"job", fmt.Sprintf("job-%d", s%20),
4017+
"env", fmt.Sprintf("env-%d", s%5),
4018+
"pod", fmt.Sprintf("pod-%d", s%1000),
4019+
}
4020+
4021+
// Add sparse label only for half of the series
4022+
if s%2 == 0 {
4023+
labelPairs = append(labelPairs, "sparse_label", fmt.Sprintf("sparse-%d", s%50))
4024+
}
4025+
4026+
lbls := labels.FromStrings(labelPairs...)
4027+
4028+
samples := make([]cortexpb.Sample, 0, 5)
4029+
for t := range 5 {
4030+
samples = append(samples, cortexpb.Sample{
4031+
Value: float64(s + t),
4032+
TimestampMs: int64(s*5 + t),
4033+
})
4034+
}
4035+
4036+
// Create labels slice with same length as samples
4037+
labelsSlice := make([]labels.Labels, len(samples))
4038+
for j := range labelsSlice {
4039+
labelsSlice[j] = lbls
4040+
}
4041+
4042+
req := cortexpb.ToWriteRequest(labelsSlice, samples, nil, nil, cortexpb.API)
4043+
_, err = i.Push(ctx, req)
4044+
require.NoError(b, err)
4045+
}
4046+
4047+
db, err := i.getTSDB(userID)
4048+
require.NoError(b, err)
4049+
require.NotNil(b, db)
4050+
4051+
mockStream := &mockQueryStreamServer{ctx: ctx}
4052+
sm := (&storepb.ShardInfo{
4053+
TotalShards: 0,
4054+
}).Matcher(nil)
4055+
4056+
b.ReportAllocs()
4057+
b.ResetTimer()
4058+
4059+
for b.Loop() {
4060+
numSeries, numSamples, _, numChunks, err := i.queryStreamChunks(
4061+
ctx, db, 0, 5000, matchers, sm, mockStream)
4062+
4063+
require.NoError(b, err)
4064+
require.Greater(b, numSeries, 0)
4065+
require.Greater(b, numSamples, 0)
4066+
require.Greater(b, numChunks, 0)
4067+
4068+
// Reset the mock stream for next iteration
4069+
mockStream.series = mockStream.series[:0]
4070+
}
4071+
}
4072+
39444073
func benchmarkQueryStream(b *testing.B, samplesCount, seriesCount int) {
39454074
cfg := defaultIngesterTestConfig(b)
39464075

pkg/ingester/matchers.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package ingester
2+
3+
import (
4+
"slices"
5+
6+
"github.com/prometheus/prometheus/model/labels"
7+
)
8+
9+
// optimizeMatchers categorizes input matchers to matchers used in select and matchers applied lazily
10+
// when scanning series.
11+
func optimizeMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, []*labels.Matcher) {
12+
// If there is only 1 matcher, use it for select.
13+
// If there is no matcher to optimize, also return early.
14+
if len(matchers) < 2 || !canOptimizeMatchers(matchers) {
15+
return matchers, nil
16+
}
17+
selectMatchers := make([]*labels.Matcher, 0, len(matchers))
18+
lazyMatchers := make([]*labels.Matcher, 0)
19+
for _, m := range matchers {
20+
// =~.* is a noop as it matches everything.
21+
if m.Type == labels.MatchRegexp && m.Value == ".*" {
22+
continue
23+
}
24+
if lazyMatcher(m) {
25+
lazyMatchers = append(lazyMatchers, m)
26+
continue
27+
}
28+
selectMatchers = append(selectMatchers, m)
29+
}
30+
31+
// We need at least 1 select matcher.
32+
if len(selectMatchers) == 0 {
33+
selectMatchers = lazyMatchers[:1]
34+
lazyMatchers = lazyMatchers[1:]
35+
}
36+
37+
return selectMatchers, lazyMatchers
38+
}
39+
40+
func canOptimizeMatchers(matchers []*labels.Matcher) bool {
41+
return slices.ContainsFunc(matchers, lazyMatcher)
42+
}
43+
44+
func labelsMatches(lbls labels.Labels, matchers []*labels.Matcher) bool {
45+
for _, m := range matchers {
46+
if !m.Matches(lbls.Get(m.Name)) {
47+
return false
48+
}
49+
}
50+
return true
51+
}
52+
53+
// lazyMatcher checks if the label matcher should be applied lazily when scanning series instead of fetching postings
54+
// for matcher. The matchers to apply lazily are matchers that are known to have low selectivity.
55+
func lazyMatcher(matcher *labels.Matcher) bool {
56+
if matcher.Value == ".+" && matcher.Type == labels.MatchRegexp {
57+
return true
58+
}
59+
if matcher.Value == "" && (matcher.Type == labels.MatchNotEqual || matcher.Type == labels.MatchNotRegexp) {
60+
return true
61+
}
62+
return false
63+
}

0 commit comments

Comments
 (0)