Skip to content

Commit 444c8e8

Browse files
authored
Add ingester.skip-metadata-limits flag (#6744)
Signed-off-by: SungJin1212 <[email protected]>
1 parent 5f060b7 commit 444c8e8

File tree

6 files changed

+151
-47
lines changed

6 files changed

+151
-47
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716
1414
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1515
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
16-
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681
16+
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. Add a `-ingester.return-all-metadata` flag to make the metadata API run when the deployment. Please set this flag to `false` to use the metadata API with the limits later. #6681 #6744
1717
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695
1818
* [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607
1919
* [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3272,6 +3272,10 @@ instance_limits:
32723272
# Maximum number of entries in the regex matchers cache. 0 to disable.
32733273
# CLI flag: -ingester.matchers-cache-max-items
32743274
[matchers_cache_max_items: <int> | default = 0]
3275+
3276+
# If enabled, the metadata API returns all metadata regardless of the limits.
3277+
# CLI flag: -ingester.skip-metadata-limits
3278+
[skip_metadata_limits: <boolean> | default = true]
32753279
```
32763280

32773281
### `ingester_client_config`

integration/backward_compatibility_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,83 @@ func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) {
123123
}
124124
}
125125

126+
// Test for #6744. When the querier is running on an older version, while the ingester is running on a newer
127+
// version, the ingester should return all metadata.
128+
func TestMetadataAPIWhenDeployment(t *testing.T) {
129+
oldImage := "quay.io/cortexproject/cortex:v1.19.0"
130+
s, err := e2e.NewScenario(networkName)
131+
require.NoError(t, err)
132+
defer s.Close()
133+
134+
// Start dependencies.
135+
consul := e2edb.NewConsulWithName("consul")
136+
require.NoError(t, s.StartAndWaitReady(consul))
137+
138+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
139+
140+
minio := e2edb.NewMinio(9000, baseFlags["-blocks-storage.s3.bucket-name"])
141+
require.NoError(t, s.StartAndWaitReady(minio))
142+
143+
oldFlags := mergeFlags(baseFlags, map[string]string{
144+
// alert manager
145+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
146+
// consul
147+
"-ring.store": "consul",
148+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
149+
})
150+
151+
newFlags := mergeFlags(oldFlags, map[string]string{
152+
// ingester
153+
"-ingester.skip-metadata-limits": "true",
154+
})
155+
156+
// Start Cortex components
157+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), newFlags, "")
158+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), newFlags, "")
159+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), oldFlags, oldImage)
160+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
161+
162+
// Wait until distributor has updated the ring.
163+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
164+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
165+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
166+
167+
// Wait until querier has updated the ring.
168+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
169+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
170+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
171+
172+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
173+
require.NoError(t, err)
174+
175+
metadataMetricNum := 5
176+
metadataPerMetrics := 2
177+
metadata := make([]prompb.MetricMetadata, 0, metadataMetricNum)
178+
for i := 0; i < metadataMetricNum; i++ {
179+
for j := 0; j < metadataPerMetrics; j++ {
180+
metadata = append(metadata, prompb.MetricMetadata{
181+
MetricFamilyName: fmt.Sprintf("metadata_name_%d", i),
182+
Help: fmt.Sprintf("metadata_help_%d_%d", i, j),
183+
Unit: fmt.Sprintf("metadata_unit_%d_%d", i, j),
184+
})
185+
}
186+
}
187+
res, err := client.Push(nil, metadata...)
188+
require.NoError(t, err)
189+
require.Equal(t, 200, res.StatusCode)
190+
191+
// should return all metadata regardless of the limit
192+
maxLimit := metadataMetricNum * metadataPerMetrics
193+
for i := -1; i <= maxLimit; i++ {
194+
result, err := client.Metadata("", strconv.Itoa(i))
195+
require.NoError(t, err)
196+
require.Equal(t, metadataMetricNum, len(result))
197+
for _, metadata := range result {
198+
require.Equal(t, metadataPerMetrics, len(metadata))
199+
}
200+
}
201+
}
202+
126203
// Test cortex which uses Prometheus v3.x can support holt_winters function
127204
func TestCanSupportHoltWintersFunc(t *testing.T) {
128205
s, err := e2e.NewScenario(networkName)

pkg/ingester/ingester.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ type Config struct {
156156

157157
// Maximum number of entries in the matchers cache. 0 to disable.
158158
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
159+
160+
// If enabled, the metadata API returns all metadata regardless of the limits.
161+
SkipMetadataLimits bool `yaml:"skip_metadata_limits"`
159162
}
160163

161164
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -177,6 +180,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
177180

178181
f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.")
179182
f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
183+
f.BoolVar(&cfg.SkipMetadataLimits, "ingester.skip-metadata-limits", true, "If enabled, the metadata API returns all metadata regardless of the limits.")
180184

181185
cfg.DefaultLimits.RegisterFlagsWithPrefix(f, "ingester.")
182186
}
@@ -3096,7 +3100,7 @@ func (i *Ingester) getOrCreateUserMetadata(userID string) *userMetricsMetadata {
30963100
// Ensure it was not created between switching locks.
30973101
userMetadata, ok := i.usersMetadata[userID]
30983102
if !ok {
3099-
userMetadata = newMetadataMap(i.limiter, i.metrics, i.validateMetrics, userID)
3103+
userMetadata = newMetadataMap(i.limiter, i.metrics, i.validateMetrics, userID, i.cfg.SkipMetadataLimits)
31003104
i.usersMetadata[userID] = userMetadata
31013105
}
31023106
return userMetadata

pkg/ingester/user_metrics_metadata.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,33 @@ import (
1111
"github.com/cortexproject/cortex/pkg/util/validation"
1212
)
1313

14+
const (
15+
defaultLimit = -1
16+
defaultLimitPerMetric = -1
17+
)
18+
1419
// userMetricsMetadata allows metric metadata of a tenant to be held by the ingester.
1520
// Metadata is kept as a set as it can come from multiple targets that Prometheus scrapes
1621
// with the same metric name.
1722
type userMetricsMetadata struct {
18-
limiter *Limiter
19-
metrics *ingesterMetrics
20-
validateMetrics *validation.ValidateMetrics
21-
userID string
23+
limiter *Limiter
24+
metrics *ingesterMetrics
25+
validateMetrics *validation.ValidateMetrics
26+
userID string
27+
skipMetadataLimits bool
2228

2329
mtx sync.RWMutex
2430
metricToMetadata map[string]metricMetadataSet
2531
}
2632

27-
func newMetadataMap(l *Limiter, m *ingesterMetrics, v *validation.ValidateMetrics, userID string) *userMetricsMetadata {
33+
func newMetadataMap(l *Limiter, m *ingesterMetrics, v *validation.ValidateMetrics, userID string, skipMetadataLimits bool) *userMetricsMetadata {
2834
return &userMetricsMetadata{
29-
metricToMetadata: map[string]metricMetadataSet{},
30-
limiter: l,
31-
metrics: m,
32-
validateMetrics: v,
33-
userID: userID,
35+
metricToMetadata: map[string]metricMetadataSet{},
36+
limiter: l,
37+
metrics: m,
38+
validateMetrics: v,
39+
userID: userID,
40+
skipMetadataLimits: skipMetadataLimits,
3441
}
3542
}
3643

@@ -88,8 +95,17 @@ func (mm *userMetricsMetadata) purge(deadline time.Time) {
8895
func (mm *userMetricsMetadata) toClientMetadata(req *client.MetricsMetadataRequest) []*cortexpb.MetricMetadata {
8996
mm.mtx.RLock()
9097
defer mm.mtx.RUnlock()
98+
9199
r := make([]*cortexpb.MetricMetadata, 0, len(mm.metricToMetadata))
92-
if req.Limit == 0 {
100+
limit := req.Limit
101+
limitPerMetric := req.LimitPerMetric
102+
103+
if mm.skipMetadataLimits {
104+
// set limit and limitPerMetric to default
105+
limit = defaultLimit
106+
limitPerMetric = defaultLimitPerMetric
107+
}
108+
if limit == 0 {
93109
return r
94110
}
95111

@@ -99,16 +115,16 @@ func (mm *userMetricsMetadata) toClientMetadata(req *client.MetricsMetadataReque
99115
return r
100116
}
101117

102-
metadataSet.add(req.LimitPerMetric, &r)
118+
metadataSet.add(limitPerMetric, &r)
103119
return r
104120
}
105121

106122
var metrics int64
107123
for _, set := range mm.metricToMetadata {
108-
if req.Limit > 0 && metrics >= req.Limit {
124+
if limit > 0 && metrics >= limit {
109125
break
110126
}
111-
set.add(req.LimitPerMetric, &r)
127+
set.add(limitPerMetric, &r)
112128
metrics++
113129
}
114130
return r

pkg/ingester/user_metrics_metadata_test.go

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@ import (
1414
"github.com/cortexproject/cortex/pkg/util/validation"
1515
)
1616

17-
const (
18-
defaultLimit = -1
19-
defaultLimitPerMetric = -1
20-
)
21-
2217
func Test_UserMetricsMetadata(t *testing.T) {
2318
userId := "user-1"
2419

@@ -43,33 +38,13 @@ func Test_UserMetricsMetadata(t *testing.T) {
4338
require.NoError(t, err)
4439
limiter := NewLimiter(overrides, nil, util.ShardingStrategyDefault, true, 1, false, "")
4540

46-
userMetricsMetadata := newMetadataMap(limiter, m, validation.NewValidateMetrics(reg), userId)
47-
48-
addMetricMetadata := func(name string, i int) {
49-
metadata := &cortexpb.MetricMetadata{
50-
MetricFamilyName: fmt.Sprintf("%s_%d", name, i),
51-
Type: cortexpb.GAUGE,
52-
Help: fmt.Sprintf("a help for %s", name),
53-
Unit: fmt.Sprintf("a unit for %s", name),
54-
}
55-
56-
err := userMetricsMetadata.add(name, metadata)
57-
require.NoError(t, err)
58-
}
59-
60-
metadataNumPerMetric := 3
61-
for _, m := range []string{"metric1", "metric2"} {
62-
for i := range metadataNumPerMetric {
63-
addMetricMetadata(m, i)
64-
}
65-
}
66-
6741
tests := []struct {
68-
description string
69-
limit int64
70-
limitPerMetric int64
71-
metric string
72-
expectedLength int
42+
description string
43+
limit int64
44+
limitPerMetric int64
45+
metric string
46+
expectedLength int
47+
skipMetadataLimits bool
7348
}{
7449
{
7550
description: "limit: 1",
@@ -122,10 +97,38 @@ func Test_UserMetricsMetadata(t *testing.T) {
12297
metric: "dummy",
12398
expectedLength: 0,
12499
},
100+
{
101+
description: "enable skipMetadataLimits",
102+
limit: 1,
103+
limitPerMetric: 2,
104+
expectedLength: 2 * 3, // # of metric * metadataNumPerMetric
105+
skipMetadataLimits: true,
106+
},
125107
}
126108

127109
for _, test := range tests {
128110
t.Run(test.description, func(t *testing.T) {
111+
userMetricsMetadata := newMetadataMap(limiter, m, validation.NewValidateMetrics(reg), userId, test.skipMetadataLimits)
112+
113+
addMetricMetadata := func(name string, i int) {
114+
metadata := &cortexpb.MetricMetadata{
115+
MetricFamilyName: fmt.Sprintf("%s_%d", name, i),
116+
Type: cortexpb.GAUGE,
117+
Help: fmt.Sprintf("a help for %s", name),
118+
Unit: fmt.Sprintf("a unit for %s", name),
119+
}
120+
121+
err := userMetricsMetadata.add(name, metadata)
122+
require.NoError(t, err)
123+
}
124+
125+
metadataNumPerMetric := 3
126+
for _, m := range []string{"metric1", "metric2"} {
127+
for i := range metadataNumPerMetric {
128+
addMetricMetadata(m, i)
129+
}
130+
}
131+
129132
req := &client.MetricsMetadataRequest{Limit: test.limit, LimitPerMetric: test.limitPerMetric, Metric: test.metric}
130133

131134
r := userMetricsMetadata.toClientMetadata(req)

0 commit comments

Comments
 (0)