Skip to content

Commit 8668b1c

Browse files
authored
Implement metadata api query params (#6681)
Signed-off-by: SungJin1212 <[email protected]>
1 parent 15a9eae commit 8668b1c

19 files changed

+842
-137
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
77
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
88
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
9+
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681
910
* [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607
1011
* [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659
1112
* [ENHANCEMENT] Querier: limit label APIs to query only ingesters if `start` param is not been specified. #6618

integration/e2ecortex/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ func NewPromQueryClient(address string) (*Client, error) {
115115
}
116116

117117
// Push the input timeseries to the remote endpoint
118-
func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
118+
func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricMetadata) (*http.Response, error) {
119119
// Create write request
120-
data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries})
120+
data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries, Metadata: metadata})
121121
if err != nil {
122122
return nil, err
123123
}

integration/ingester_metadata_test.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"fmt"
8+
"strings"
9+
"testing"
10+
11+
"github.com/prometheus/prometheus/model/labels"
12+
"github.com/prometheus/prometheus/prompb"
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/cortexproject/cortex/integration/e2e"
16+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
17+
"github.com/cortexproject/cortex/integration/e2ecortex"
18+
)
19+
20+
func TestIngesterMetadata(t *testing.T) {
21+
s, err := e2e.NewScenario(networkName)
22+
require.NoError(t, err)
23+
defer s.Close()
24+
25+
// Start dependencies.
26+
consul := e2edb.NewConsul()
27+
require.NoError(t, s.StartAndWaitReady(consul))
28+
29+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
30+
31+
minio := e2edb.NewMinio(9000, baseFlags["-blocks-storage.s3.bucket-name"])
32+
require.NoError(t, s.StartAndWaitReady(minio))
33+
34+
flags := mergeFlags(baseFlags, map[string]string{
35+
// alert manager
36+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
37+
// consul
38+
"-ring.store": "consul",
39+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
40+
})
41+
42+
// Start Cortex components
43+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
44+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
45+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
46+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
47+
48+
// Wait until distributor has updated the ring.
49+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
50+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
51+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
52+
53+
// Wait until querier has updated the ring.
54+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
55+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
56+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
57+
58+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
59+
require.NoError(t, err)
60+
61+
metadataMetricNum := 5
62+
metadataPerMetrics := 2
63+
metadata := make([]prompb.MetricMetadata, 0, metadataMetricNum)
64+
for i := 0; i < metadataMetricNum; i++ {
65+
for j := 0; j < metadataPerMetrics; j++ {
66+
metadata = append(metadata, prompb.MetricMetadata{
67+
MetricFamilyName: fmt.Sprintf("metadata_name_%d", i),
68+
Help: fmt.Sprintf("metadata_help_%d_%d", i, j),
69+
Unit: fmt.Sprintf("metadata_unit_%d_%d", i, j),
70+
})
71+
}
72+
}
73+
res, err := client.Push(nil, metadata...)
74+
require.NoError(t, err)
75+
require.Equal(t, 200, res.StatusCode)
76+
77+
testMetadataQueryParams(t, client, metadataMetricNum, metadataPerMetrics)
78+
}
79+
80+
func TestIngesterMetadataWithTenantFederation(t *testing.T) {
81+
s, err := e2e.NewScenario(networkName)
82+
require.NoError(t, err)
83+
defer s.Close()
84+
85+
// Start dependencies.
86+
consul := e2edb.NewConsul()
87+
require.NoError(t, s.StartAndWaitReady(consul))
88+
89+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
90+
91+
minio := e2edb.NewMinio(9000, baseFlags["-blocks-storage.s3.bucket-name"])
92+
require.NoError(t, s.StartAndWaitReady(minio))
93+
94+
flags := mergeFlags(baseFlags, map[string]string{
95+
// tenant federation
96+
"-tenant-federation.enabled": "true",
97+
// alert manager
98+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
99+
// consul
100+
"-ring.store": "consul",
101+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
102+
})
103+
104+
// Start Cortex components
105+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
106+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
107+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
108+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
109+
110+
// Wait until distributor has updated the ring.
111+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
112+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
113+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
114+
115+
// Wait until querier has updated the ring.
116+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
117+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
118+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
119+
120+
metadataMetricNum := 5
121+
metadataPerMetrics := 2
122+
metadata := make([]prompb.MetricMetadata, 0, metadataMetricNum)
123+
for i := 0; i < metadataMetricNum; i++ {
124+
for j := 0; j < metadataPerMetrics; j++ {
125+
metadata = append(metadata, prompb.MetricMetadata{
126+
MetricFamilyName: fmt.Sprintf("metadata_name_%d", i),
127+
Help: fmt.Sprintf("metadata_help_%d_%d", i, j),
128+
Unit: fmt.Sprintf("metadata_unit_%d_%d", i, j),
129+
})
130+
}
131+
}
132+
133+
numUsers := 2
134+
tenantIDs := make([]string, numUsers)
135+
for u := 0; u < numUsers; u++ {
136+
tenantIDs[u] = fmt.Sprintf("user-%d", u)
137+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", tenantIDs[u])
138+
require.NoError(t, err)
139+
140+
res, err := c.Push(nil, metadata...)
141+
require.NoError(t, err)
142+
require.Equal(t, 200, res.StatusCode)
143+
}
144+
145+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", strings.Join(tenantIDs, "|"))
146+
require.NoError(t, err)
147+
148+
testMetadataQueryParams(t, client, metadataMetricNum, metadataPerMetrics)
149+
}
150+
151+
func testMetadataQueryParams(t *testing.T, client *e2ecortex.Client, metadataMetricNum, metadataPerMetrics int) {
152+
t.Run("test no parameter", func(t *testing.T) {
153+
result, err := client.Metadata("", "")
154+
require.NoError(t, err)
155+
require.Equal(t, metadataMetricNum, len(result))
156+
157+
for _, v := range result {
158+
require.Equal(t, metadataPerMetrics, len(v))
159+
}
160+
})
161+
162+
t.Run("test name parameter", func(t *testing.T) {
163+
t.Run("existing name", func(t *testing.T) {
164+
name := "metadata_name_0"
165+
result, err := client.Metadata(name, "")
166+
require.NoError(t, err)
167+
m, ok := result[name]
168+
require.True(t, ok)
169+
require.Equal(t, metadataPerMetrics, len(m))
170+
})
171+
t.Run("existing name with limit 0", func(t *testing.T) {
172+
name := "metadata_name_0"
173+
result, err := client.Metadata(name, "0")
174+
require.NoError(t, err)
175+
require.Equal(t, 0, len(result))
176+
})
177+
t.Run("non-existing name", func(t *testing.T) {
178+
result, err := client.Metadata("dummy", "")
179+
require.NoError(t, err)
180+
require.Equal(t, 0, len(result))
181+
})
182+
})
183+
184+
t.Run("test limit parameter", func(t *testing.T) {
185+
t.Run("less than length of metadata", func(t *testing.T) {
186+
result, err := client.Metadata("", "3")
187+
require.NoError(t, err)
188+
require.Equal(t, 3, len(result))
189+
})
190+
t.Run("limit: 0", func(t *testing.T) {
191+
result, err := client.Metadata("", "0")
192+
require.NoError(t, err)
193+
require.Equal(t, 0, len(result))
194+
})
195+
t.Run("invalid limit", func(t *testing.T) {
196+
_, err := client.Metadata("", "dummy")
197+
require.Error(t, err)
198+
})
199+
})
200+
}

integration/otlp_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func TestOTLP(t *testing.T) {
8888
require.NoError(t, err)
8989
require.Equal(t, []string{"__name__", "foo"}, labelNames)
9090

91-
metadataResult, err := c.Metadata("series_1", "")
91+
metadataResult, err := c.Metadata("series_1_total", "")
9292
require.NoError(t, err)
9393
require.Equal(t, 1, len(metadataResult))
9494

pkg/distributor/distributor.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,13 +1466,12 @@ func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, t
14661466
}
14671467

14681468
// MetricsMetadata returns all metric metadata of a user.
1469-
func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
1469+
func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) {
14701470
replicationSet, err := d.GetIngestersForMetadata(ctx)
14711471
if err != nil {
14721472
return nil, err
14731473
}
14741474

1475-
req := &ingester_client.MetricsMetadataRequest{}
14761475
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
14771476
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
14781477
return client.MetricsMetadata(ctx, req)

pkg/distributor/distributor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2774,7 +2774,7 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
27742774
require.NoError(t, err)
27752775

27762776
// Assert on metric metadata
2777-
metadata, err := ds[0].MetricsMetadata(ctx)
2777+
metadata, err := ds[0].MetricsMetadata(ctx, &client.MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""})
27782778
require.NoError(t, err)
27792779
assert.Equal(t, 10, len(metadata))
27802780

0 commit comments

Comments
 (0)