Skip to content

Commit 2367777

Browse files
authoredJan 15, 2025
query, rule: make endpoint discovery dynamically reloadable (thanos-io#7890)
* Removed previously deprecated and hidden flags to configure endpoints ( --rule, --target, ...) * Added new flags --endpoint.sd-config, --endpoint-sd-config-reload-interval to configure a dynamic SD file * Moved endpoint set construction into cmd/thanos/endpointset.go for a little cleanup * Renamed "thanos_(querier/ruler)_duplicated_store_addresses_total" to "thanos_(querier/ruler)_duplicated_endpoint_addresses_total" The new config makes it possible to also set "strict" and "group" flags on the endpoint instead of only their addresses, making it possible to have file based service discovery for endpoint groups too. Signed-off-by: Michael Hoffmann <mhoffm@posteo.de> Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent 300a9ed commit 2367777

16 files changed

+572
-541
lines changed
 

‎CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
2626

2727
### Changed
2828

29+
- [#7890](https://github.com/thanos-io/thanos/pull/7890) Query,Ruler: *breaking :warning:* deprecated `--store.sd-file` and `--store.sd-interval` to be replaced with `--endpoint.sd-config` and `--endpoint-sd-config-reload-interval`; removed legacy flags to pass endpoints `--store`, `--metadata`, `--rule`, `--exemplar`.
30+
2931
### Removed
3032

3133
## [v0.37.2](https://github.com/thanos-io/thanos/tree/release-0.37) - 11.12.2024

‎cmd/thanos/config.go

+38
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,17 @@ import (
1414

1515
"github.com/KimMachineGun/automemlimit/memlimit"
1616
extflag "github.com/efficientgo/tools/extkingpin"
17+
"github.com/go-kit/log"
18+
"github.com/opentracing/opentracing-go"
1719
"github.com/pkg/errors"
20+
"google.golang.org/grpc"
1821

22+
"github.com/prometheus/client_golang/prometheus"
1923
"github.com/prometheus/common/model"
2024
"github.com/prometheus/prometheus/model/labels"
2125

26+
"github.com/thanos-io/thanos/pkg/extgrpc"
27+
"github.com/thanos-io/thanos/pkg/extgrpc/snappy"
2228
"github.com/thanos-io/thanos/pkg/extkingpin"
2329
"github.com/thanos-io/thanos/pkg/shipper"
2430
)
@@ -58,6 +64,38 @@ func (gc *grpcConfig) registerFlag(cmd extkingpin.FlagClause) *grpcConfig {
5864
return gc
5965
}
6066

67+
type grpcClientConfig struct {
68+
secure bool
69+
skipVerify bool
70+
cert, key, caCert string
71+
serverName string
72+
compression string
73+
}
74+
75+
func (gc *grpcClientConfig) registerFlag(cmd extkingpin.FlagClause) *grpcClientConfig {
76+
cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").BoolVar(&gc.secure)
77+
cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").BoolVar(&gc.skipVerify)
78+
cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").StringVar(&gc.cert)
79+
cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").StringVar(&gc.key)
80+
cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").StringVar(&gc.caCert)
81+
cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&gc.serverName)
82+
compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ")
83+
cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).EnumVar(&gc.compression, snappy.Name, compressionNone)
84+
85+
return gc
86+
}
87+
88+
func (gc *grpcClientConfig) dialOptions(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer) ([]grpc.DialOption, error) {
89+
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, gc.secure, gc.skipVerify, gc.cert, gc.key, gc.caCert, gc.serverName)
90+
if err != nil {
91+
return nil, errors.Wrapf(err, "building gRPC client")
92+
}
93+
if gc.compression != compressionNone {
94+
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gc.compression)))
95+
}
96+
return dialOpts, nil
97+
}
98+
6199
type httpConfig struct {
62100
bindAddress string
63101
tlsConfig string

‎cmd/thanos/endpointset.go

+350
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package main
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"sync"
10+
"time"
11+
12+
"github.com/go-kit/log"
13+
"github.com/go-kit/log/level"
14+
"github.com/oklog/run"
15+
"google.golang.org/grpc"
16+
"gopkg.in/yaml.v3"
17+
18+
"github.com/prometheus/client_golang/prometheus"
19+
"github.com/prometheus/client_golang/prometheus/promauto"
20+
"github.com/prometheus/common/model"
21+
"github.com/prometheus/prometheus/discovery"
22+
"github.com/prometheus/prometheus/discovery/file"
23+
"github.com/prometheus/prometheus/discovery/targetgroup"
24+
25+
"github.com/thanos-io/thanos/pkg/component"
26+
"github.com/thanos-io/thanos/pkg/discovery/cache"
27+
"github.com/thanos-io/thanos/pkg/discovery/dns"
28+
"github.com/thanos-io/thanos/pkg/errors"
29+
"github.com/thanos-io/thanos/pkg/extgrpc"
30+
"github.com/thanos-io/thanos/pkg/extkingpin"
31+
"github.com/thanos-io/thanos/pkg/extprom"
32+
"github.com/thanos-io/thanos/pkg/query"
33+
"github.com/thanos-io/thanos/pkg/runutil"
34+
)
35+
36+
// fileContent is the interface of methods that we need from extkingpin.PathOrContent.
37+
// We need to abstract it for now so we can implement a default if the user does not provide one.
38+
type fileContent interface {
39+
Content() ([]byte, error)
40+
Path() string
41+
}
42+
43+
type endpointSettings struct {
44+
Strict bool `yaml:"strict"`
45+
Group bool `yaml:"group"`
46+
Address string `yaml:"address"`
47+
}
48+
49+
type EndpointConfig struct {
50+
Endpoints []endpointSettings `yaml:"endpoints"`
51+
}
52+
53+
type endpointConfigProvider struct {
54+
mu sync.Mutex
55+
cfg EndpointConfig
56+
57+
// statically defined endpoints from flags for backwards compatibility
58+
endpoints []string
59+
endpointGroups []string
60+
strictEndpoints []string
61+
strictEndpointGroups []string
62+
}
63+
64+
func (er *endpointConfigProvider) config() EndpointConfig {
65+
er.mu.Lock()
66+
defer er.mu.Unlock()
67+
68+
res := EndpointConfig{Endpoints: make([]endpointSettings, len(er.cfg.Endpoints))}
69+
copy(res.Endpoints, er.cfg.Endpoints)
70+
return res
71+
}
72+
73+
func (er *endpointConfigProvider) parse(configFile fileContent) (EndpointConfig, error) {
74+
content, err := configFile.Content()
75+
if err != nil {
76+
return EndpointConfig{}, errors.Wrapf(err, "unable to load config content: %s", configFile.Path())
77+
}
78+
var cfg EndpointConfig
79+
if err := yaml.Unmarshal(content, &cfg); err != nil {
80+
return EndpointConfig{}, errors.Wrapf(err, "unable to unmarshal config content: %s", configFile.Path())
81+
}
82+
return cfg, nil
83+
}
84+
85+
func (er *endpointConfigProvider) addStaticEndpoints(cfg *EndpointConfig) {
86+
for _, e := range er.endpoints {
87+
cfg.Endpoints = append(cfg.Endpoints, endpointSettings{
88+
Address: e,
89+
})
90+
}
91+
for _, e := range er.endpointGroups {
92+
cfg.Endpoints = append(cfg.Endpoints, endpointSettings{
93+
Address: e,
94+
Group: true,
95+
})
96+
}
97+
for _, e := range er.strictEndpoints {
98+
cfg.Endpoints = append(cfg.Endpoints, endpointSettings{
99+
Address: e,
100+
Strict: true,
101+
})
102+
}
103+
for _, e := range er.strictEndpointGroups {
104+
cfg.Endpoints = append(cfg.Endpoints, endpointSettings{
105+
Address: e,
106+
Group: true,
107+
Strict: true,
108+
})
109+
}
110+
}
111+
112+
func validateEndpointConfig(cfg EndpointConfig) error {
113+
for _, ecfg := range cfg.Endpoints {
114+
if dns.IsDynamicNode(ecfg.Address) && ecfg.Strict {
115+
return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode.", ecfg.Address)
116+
}
117+
}
118+
return nil
119+
}
120+
121+
func newEndpointConfigProvider(
122+
logger log.Logger,
123+
configFile fileContent,
124+
configReloadInterval time.Duration,
125+
staticEndpoints []string,
126+
staticEndpointGroups []string,
127+
staticStrictEndpoints []string,
128+
staticStrictEndpointGroups []string,
129+
) (*endpointConfigProvider, error) {
130+
res := &endpointConfigProvider{
131+
endpoints: staticEndpoints,
132+
endpointGroups: staticEndpointGroups,
133+
strictEndpoints: staticStrictEndpoints,
134+
strictEndpointGroups: staticStrictEndpointGroups,
135+
}
136+
137+
if configFile == nil {
138+
configFile = extkingpin.NewNopConfig()
139+
}
140+
141+
cfg, err := res.parse(configFile)
142+
if err != nil {
143+
return nil, errors.Wrapf(err, "unable to load config file")
144+
}
145+
res.addStaticEndpoints(&cfg)
146+
res.cfg = cfg
147+
if err := validateEndpointConfig(cfg); err != nil {
148+
return nil, errors.Wrapf(err, "unable to validate endpoints")
149+
}
150+
151+
// only static endpoints
152+
if len(configFile.Path()) == 0 {
153+
return res, nil
154+
}
155+
156+
if err := extkingpin.PathContentReloader(context.Background(), configFile, logger, func() {
157+
res.mu.Lock()
158+
defer res.mu.Unlock()
159+
160+
level.Info(logger).Log("msg", "reloading endpoint config")
161+
cfg, err := res.parse(configFile)
162+
if err != nil {
163+
level.Error(logger).Log("msg", "failed to reload endpoint config", "err", err)
164+
return
165+
}
166+
res.addStaticEndpoints(&cfg)
167+
if err := validateEndpointConfig(cfg); err != nil {
168+
level.Error(logger).Log("msg", "failed to validate endpoint config", "err", err)
169+
return
170+
}
171+
res.cfg = cfg
172+
}, configReloadInterval); err != nil {
173+
return nil, errors.Wrapf(err, "unable to create config reloader")
174+
}
175+
return res, nil
176+
}
177+
178+
func setupEndpointSet(
179+
g *run.Group,
180+
comp component.Component,
181+
reg prometheus.Registerer,
182+
logger log.Logger,
183+
configFile fileContent,
184+
configReloadInterval time.Duration,
185+
legacyFileSDFiles []string,
186+
legacyFileSDInterval time.Duration,
187+
legacyEndpoints []string,
188+
legacyEndpointGroups []string,
189+
legacyStrictEndpoints []string,
190+
legacyStrictEndpointGroups []string,
191+
dnsSDResolver string,
192+
dnsSDInterval time.Duration,
193+
unhealthyTimeout time.Duration,
194+
endpointTimeout time.Duration,
195+
dialOpts []grpc.DialOption,
196+
queryConnMetricLabels ...string,
197+
) (*query.EndpointSet, error) {
198+
configProvider, err := newEndpointConfigProvider(
199+
logger,
200+
configFile,
201+
configReloadInterval,
202+
legacyEndpoints,
203+
legacyEndpointGroups,
204+
legacyStrictEndpoints,
205+
legacyStrictEndpointGroups,
206+
)
207+
if err != nil {
208+
return nil, errors.Wrapf(err, "unable to load config initially")
209+
}
210+
// Register resolver for the "thanos:///" scheme for endpoint-groups
211+
dns.RegisterGRPCResolver(
212+
logger,
213+
dns.NewProvider(
214+
logger,
215+
extprom.WrapRegistererWithPrefix(fmt.Sprintf("thanos_%s_endpoint_groups_", comp), reg),
216+
dns.ResolverType(dnsSDResolver),
217+
),
218+
dnsSDInterval,
219+
)
220+
dnsEndpointProvider := dns.NewProvider(
221+
logger,
222+
extprom.WrapRegistererWithPrefix(fmt.Sprintf("thanos_%s_endpoints_", comp), reg),
223+
dns.ResolverType(dnsSDResolver),
224+
)
225+
duplicatedEndpoints := promauto.With(reg).NewCounter(prometheus.CounterOpts{
226+
Name: fmt.Sprintf("thanos_%s_duplicated_endpoint_addresses_total", comp),
227+
Help: "The number of times a duplicated endpoint addresses is detected from the different configs",
228+
})
229+
230+
removeDuplicateEndpointSpecs := func(specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec {
231+
set := make(map[string]*query.GRPCEndpointSpec)
232+
for _, spec := range specs {
233+
addr := spec.Addr()
234+
if _, ok := set[addr]; ok {
235+
level.Warn(logger).Log("msg", "Duplicate endpoint address is provided", "addr", addr)
236+
duplicatedEndpoints.Inc()
237+
}
238+
set[addr] = spec
239+
}
240+
deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set))
241+
for _, value := range set {
242+
deduplicated = append(deduplicated, value)
243+
}
244+
return deduplicated
245+
}
246+
var fileSD *file.Discovery
247+
if len(legacyFileSDFiles) > 0 {
248+
conf := &file.SDConfig{
249+
Files: legacyFileSDFiles,
250+
RefreshInterval: model.Duration(legacyFileSDInterval),
251+
}
252+
var err error
253+
if fileSD, err = file.NewDiscovery(conf, logger, conf.NewDiscovererMetrics(reg, discovery.NewRefreshMetrics(reg))); err != nil {
254+
return nil, fmt.Errorf("unable to create new legacy file sd config: %w", err)
255+
}
256+
}
257+
legacyFileSDCache := cache.New()
258+
259+
ctx, cancel := context.WithCancel(context.Background())
260+
261+
if fileSD != nil {
262+
fileSDUpdates := make(chan []*targetgroup.Group)
263+
264+
g.Add(func() error {
265+
fileSD.Run(ctx, fileSDUpdates)
266+
return nil
267+
268+
}, func(err error) {
269+
cancel()
270+
})
271+
272+
g.Add(func() error {
273+
for {
274+
select {
275+
case update := <-fileSDUpdates:
276+
// Discoverers sometimes send nil updates so need to check for it to avoid panics.
277+
if update == nil {
278+
continue
279+
}
280+
legacyFileSDCache.Update(update)
281+
case <-ctx.Done():
282+
return nil
283+
}
284+
}
285+
}, func(err error) {
286+
cancel()
287+
})
288+
}
289+
290+
{
291+
g.Add(func() error {
292+
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
293+
ctxUpdateIter, cancelUpdateIter := context.WithTimeout(ctx, dnsSDInterval)
294+
defer cancelUpdateIter()
295+
296+
endpointConfig := configProvider.config()
297+
298+
addresses := make([]string, 0, len(endpointConfig.Endpoints))
299+
for _, ecfg := range endpointConfig.Endpoints {
300+
if addr := ecfg.Address; !ecfg.Group && !ecfg.Strict {
301+
// originally only "--endpoint" addresses got resolved
302+
addresses = append(addresses, addr)
303+
}
304+
}
305+
addresses = append(addresses, legacyFileSDCache.Addresses()...)
306+
if err := dnsEndpointProvider.Resolve(ctxUpdateIter, addresses, true); err != nil {
307+
level.Error(logger).Log("msg", "failed to resolve addresses for endpoints", "err", err)
308+
}
309+
return nil
310+
})
311+
}, func(error) {
312+
cancel()
313+
})
314+
}
315+
316+
endpointset := query.NewEndpointSet(time.Now, logger, reg, func() []*query.GRPCEndpointSpec {
317+
endpointConfig := configProvider.config()
318+
319+
specs := make([]*query.GRPCEndpointSpec, 0)
320+
for _, ecfg := range endpointConfig.Endpoints {
321+
strict, group, addr := ecfg.Strict, ecfg.Group, ecfg.Address
322+
if dns.IsDynamicNode(addr) {
323+
continue
324+
}
325+
if group {
326+
specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)...))
327+
} else {
328+
specs = append(specs, query.NewGRPCEndpointSpec(addr, strict, dialOpts...))
329+
}
330+
}
331+
for _, addr := range dnsEndpointProvider.Addresses() {
332+
specs = append(specs, query.NewGRPCEndpointSpec(addr, false, dialOpts...))
333+
}
334+
return removeDuplicateEndpointSpecs(specs)
335+
}, unhealthyTimeout, endpointTimeout, queryConnMetricLabels...)
336+
337+
g.Add(func() error {
338+
return runutil.Repeat(endpointTimeout, ctx.Done(), func() error {
339+
ctxIter, cancelIter := context.WithTimeout(ctx, endpointTimeout)
340+
defer cancelIter()
341+
342+
endpointset.Update(ctxIter)
343+
return nil
344+
})
345+
}, func(error) {
346+
cancel()
347+
})
348+
349+
return endpointset, nil
350+
}

‎cmd/thanos/query.go

+71-392
Large diffs are not rendered by default.

‎cmd/thanos/rule.go

+12-34
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ import (
7979
"github.com/thanos-io/thanos/pkg/ui"
8080
)
8181

82-
const dnsSDResolver = "miekgdns"
83-
8482
type ruleConfig struct {
8583
http httpConfig
8684
grpc grpcConfig
@@ -404,17 +402,6 @@ func runRule(
404402
}
405403

406404
if len(grpcEndpoints) > 0 {
407-
duplicatedGRPCEndpoints := promauto.With(reg).NewCounter(prometheus.CounterOpts{
408-
Name: "thanos_rule_grpc_endpoints_duplicated_total",
409-
Help: "The number of times a duplicated grpc endpoint is detected from the different configs in rule",
410-
})
411-
412-
dnsEndpointProvider := dns.NewProvider(
413-
logger,
414-
extprom.WrapRegistererWithPrefix("thanos_rule_grpc_endpoints_", reg),
415-
dnsSDResolver,
416-
)
417-
418405
dialOpts, err := extgrpc.StoreClientGRPCOpts(
419406
logger,
420407
reg,
@@ -430,36 +417,27 @@ func runRule(
430417
return err
431418
}
432419

433-
grpcEndpointSet = prepareEndpointSet(
420+
grpcEndpointSet, err = setupEndpointSet(
434421
g,
435-
logger,
422+
comp,
436423
reg,
437-
[]*dns.Provider{dnsEndpointProvider},
438-
duplicatedGRPCEndpoints,
424+
logger,
439425
nil,
426+
1*time.Minute,
440427
nil,
428+
1*time.Minute,
429+
grpcEndpoints,
441430
nil,
442431
nil,
443-
dialOpts,
432+
nil,
433+
conf.query.dnsSDResolver,
434+
conf.query.dnsSDInterval,
444435
5*time.Minute,
445436
5*time.Second,
437+
dialOpts,
446438
)
447-
448-
// Periodically update the GRPC addresses from query config by resolving them using DNS SD if necessary.
449-
{
450-
ctx, cancel := context.WithCancel(context.Background())
451-
g.Add(func() error {
452-
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
453-
resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second)
454-
defer resolveCancel()
455-
if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil {
456-
level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err)
457-
}
458-
return nil
459-
})
460-
}, func(error) {
461-
cancel()
462-
})
439+
if err != nil {
440+
return err
463441
}
464442
}
465443

‎docs/components/query.md

+34-37
Original file line numberDiff line numberDiff line change
@@ -299,27 +299,37 @@ Flags:
299299
detected maximum container or system memory.
300300
--enable-auto-gomemlimit Enable go runtime to automatically limit memory
301301
consumption.
302-
--endpoint=<endpoint> ... Addresses of statically configured Thanos
303-
API servers (repeatable). The scheme may be
304-
prefixed with 'dns+' or 'dnssrv+' to detect
305-
Thanos API servers through respective DNS
306-
lookups.
302+
--endpoint=<endpoint> ... (Deprecated): Addresses of statically
303+
configured Thanos API servers (repeatable).
304+
The scheme may be prefixed with 'dns+' or
305+
'dnssrv+' to detect Thanos API servers through
306+
respective DNS lookups.
307307
--endpoint-group=<endpoint-group> ...
308-
Experimental: DNS name of statically configured
309-
Thanos API server groups (repeatable). Targets
310-
resolved from the DNS name will be queried in
311-
a round-robin, instead of a fanout manner.
312-
This flag should be used when connecting a
313-
Thanos Query to HA groups of Thanos components.
308+
(Deprecated, Experimental): DNS name of
309+
statically configured Thanos API server groups
310+
(repeatable). Targets resolved from the DNS
311+
name will be queried in a round-robin, instead
312+
of a fanout manner. This flag should be used
313+
when connecting a Thanos Query to HA groups of
314+
Thanos components.
314315
--endpoint-group-strict=<endpoint-group-strict> ...
315-
Experimental: DNS name of statically configured
316-
Thanos API server groups (repeatable) that are
317-
always used, even if the health check fails.
318-
--endpoint-strict=<staticendpoint> ...
319-
Addresses of only statically configured Thanos
320-
API servers that are always used, even if
321-
the health check fails. Useful if you have a
322-
caching layer on top.
316+
(Deprecated, Experimental): DNS name of
317+
statically configured Thanos API server groups
318+
(repeatable) that are always used, even if the
319+
health check fails.
320+
--endpoint-strict=<endpoint-strict> ...
321+
(Deprecated): Addresses of only statically
322+
configured Thanos API servers that are always
323+
used, even if the health check fails. Useful if
324+
you have a caching layer on top.
325+
--endpoint.sd-config=<content>
326+
Alternative to 'endpoint.sd-config-file' flag
327+
(mutually exclusive). Content of Config File
328+
with endpoint definitions
329+
--endpoint.sd-config-file=<file-path>
330+
Path to Config File with endpoint definitions
331+
--endpoint.sd-config-reload-interval=5m
332+
Interval between endpoint config refreshes
323333
--grpc-address="0.0.0.0:10901"
324334
Listen ip:port address for gRPC endpoints
325335
(StoreAPI). Make sure this address is routable
@@ -500,19 +510,6 @@ Flags:
500510
It follows the Thanos sharding relabel-config
501511
syntax. For format details see:
502512
https://thanos.io/tip/thanos/sharding.md/#relabelling
503-
--store=<store> ... Deprecation Warning - This flag is deprecated
504-
and replaced with `endpoint`. Addresses of
505-
statically configured store API servers
506-
(repeatable). The scheme may be prefixed with
507-
'dns+' or 'dnssrv+' to detect store API servers
508-
through respective DNS lookups.
509-
--store-strict=<staticstore> ...
510-
Deprecation Warning - This flag is deprecated
511-
and replaced with `endpoint-strict`. Addresses
512-
of only statically configured store API servers
513-
that are always used, even if the health check
514-
fails. Useful if you have a caching layer on
515-
top.
516513
--store.limits.request-samples=0
517514
The maximum samples allowed for a single
518515
Series request, The Series call fails if
@@ -532,11 +529,11 @@ Flags:
532529
--store.sd-dns-interval=30s
533530
Interval between DNS resolutions.
534531
--store.sd-files=<path> ...
535-
Path to files that contain addresses of store
536-
API servers. The path can be a glob pattern
537-
(repeatable).
538-
--store.sd-interval=5m Refresh interval to re-read file SD files.
539-
It is used as a resync fallback.
532+
(Deprecated) Path to files that contain
533+
addresses of store API servers. The path can be
534+
a glob pattern (repeatable).
535+
--store.sd-interval=5m (Deprecated) Refresh interval to re-read file
536+
SD files. It is used as a resync fallback.
540537
--store.unhealthy-timeout=5m
541538
Timeout before an unhealthy store is cleaned
542539
from the store UI page.

‎pkg/discovery/dns/grpc.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type builder struct {
2323
logger log.Logger
2424
}
2525

26-
func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) {
26+
func RegisterGRPCResolver(logger log.Logger, provider *Provider, interval time.Duration) {
2727
grpcresolver.Register(&builder{
2828
resolveInterval: interval,
2929
provider: provider,

‎pkg/extkingpin/flags.go

+1-9
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,8 @@ func Addrs(flags *kingpin.FlagClause) (target *addressSlice) {
4747
return
4848
}
4949

50-
// validateAddrs checks an address slice for duplicates and empty or invalid elements.
50+
// validateAddrs checks an address slice for empty or invalid elements.
5151
func validateAddrs(addrs addressSlice) error {
52-
set := map[string]struct{}{}
53-
5452
for _, addr := range addrs {
5553
if addr == "" {
5654
return errors.New("Address is empty.")
@@ -61,12 +59,6 @@ func validateAddrs(addrs addressSlice) error {
6159
if len(qtypeAndName) != 2 && len(hostAndPort) != 2 {
6260
return errors.Errorf("Address %s is not of <host>:<port> format or a valid DNS query.", addr)
6361
}
64-
65-
if _, ok := set[addr]; ok {
66-
return errors.Errorf("Address %s is duplicated.", addr)
67-
}
68-
69-
set[addr] = struct{}{}
7062
}
7163

7264
return nil

‎pkg/extkingpin/path_content_reloader.go

+19
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,25 @@ type fileContent interface {
2121
Path() string
2222
}
2323

24+
type NopConfigContent struct{}
25+
26+
var _ fileContent = (*NopConfigContent)(nil)
27+
28+
// Content returns no content and no error.
29+
func (n NopConfigContent) Content() ([]byte, error) {
30+
return nil, nil
31+
}
32+
33+
// Path returns an empty path.
34+
func (n NopConfigContent) Path() string {
35+
return ""
36+
}
37+
38+
// NewNopConfig creates a no-op config content (no configuration).
39+
func NewNopConfig() NopConfigContent {
40+
return NopConfigContent{}
41+
}
42+
2443
// PathContentReloader runs the reloadFunc when it detects that the contents of fileContent have changed.
2544
func PathContentReloader(ctx context.Context, fileContent fileContent, logger log.Logger, reloadFunc func(), debounceTime time.Duration) error {
2645
filePath, err := filepath.Abs(fileContent.Path())

‎pkg/query/endpointset.go

+10-18
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,7 @@ type EndpointSet struct {
211211

212212
// Endpoint specifications can change dynamically. If some component is missing from the list, we assume it is no longer
213213
// accessible and we close gRPC client for it, unless it is strict.
214-
endpointSpec func() map[string]*GRPCEndpointSpec
215-
dialOpts []grpc.DialOption
214+
endpointSpecs func() map[string]*GRPCEndpointSpec
216215
endpointInfoTimeout time.Duration
217216
unhealthyEndpointTimeout time.Duration
218217

@@ -235,7 +234,6 @@ func NewEndpointSet(
235234
logger log.Logger,
236235
reg prometheus.Registerer,
237236
endpointSpecs func() []*GRPCEndpointSpec,
238-
dialOpts []grpc.DialOption,
239237
unhealthyEndpointTimeout time.Duration,
240238
endpointInfoTimeout time.Duration,
241239
endpointMetricLabels ...string,
@@ -254,19 +252,17 @@ func NewEndpointSet(
254252
}
255253

256254
return &EndpointSet{
257-
now: now,
258-
logger: log.With(logger, "component", "endpointset"),
259-
endpointsMetric: endpointsMetric,
260-
261-
dialOpts: dialOpts,
255+
now: now,
256+
logger: log.With(logger, "component", "endpointset"),
257+
endpointsMetric: endpointsMetric,
262258
endpointInfoTimeout: endpointInfoTimeout,
263259
unhealthyEndpointTimeout: unhealthyEndpointTimeout,
264-
endpointSpec: func() map[string]*GRPCEndpointSpec {
265-
specs := make(map[string]*GRPCEndpointSpec)
260+
endpointSpecs: func() map[string]*GRPCEndpointSpec {
261+
res := make(map[string]*GRPCEndpointSpec)
266262
for _, s := range endpointSpecs() {
267-
specs[s.addr] = s
263+
res[s.addr] = s
268264
}
269-
return specs
265+
return res
270266
},
271267
endpoints: make(map[string]*endpointRef),
272268
}
@@ -288,7 +284,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
288284
mu sync.Mutex
289285
)
290286

291-
for _, spec := range e.endpointSpec() {
287+
for _, spec := range e.endpointSpecs() {
292288
spec := spec
293289

294290
if er, existingRef := e.endpoints[spec.Addr()]; existingRef {
@@ -571,11 +567,7 @@ type endpointRef struct {
571567
// newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address.
572568
// The call to newEndpointRef will return an error if establishing the channel fails.
573569
func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) {
574-
var dialOpts []grpc.DialOption
575-
576-
dialOpts = append(dialOpts, e.dialOpts...)
577-
dialOpts = append(dialOpts, spec.dialOpts...)
578-
conn, err := grpc.NewClient(spec.Addr(), dialOpts...)
570+
conn, err := grpc.NewClient(spec.Addr(), spec.dialOpts...)
579571
if err != nil {
580572
return nil, errors.Wrap(err, "dialing connection")
581573
}

‎pkg/query/endpointset_test.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -675,11 +675,11 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) {
675675
endpointSet := NewEndpointSet(nowFunc, nil, nil,
676676
func() (specs []*GRPCEndpointSpec) {
677677
for _, addr := range discoveredEndpointAddr {
678-
specs = append(specs, NewGRPCEndpointSpec(addr, false))
678+
specs = append(specs, NewGRPCEndpointSpec(addr, false, testGRPCOpts...))
679679
}
680680
return specs
681681
},
682-
testGRPCOpts, time.Minute, 2*time.Second)
682+
time.Minute, 2*time.Second)
683683
defer endpointSet.Close()
684684

685685
// Initial update.
@@ -1052,7 +1052,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) {
10521052
}
10531053
return specs
10541054
},
1055-
testGRPCOpts, time.Minute, 2*time.Second)
1055+
time.Minute, 2*time.Second)
10561056
defer endpointSet.Close()
10571057

10581058
// Should not matter how many of these we run.
@@ -1159,11 +1159,11 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) {
11591159
slowStaticEndpointAddr := discoveredEndpointAddr[2]
11601160
endpointSet := NewEndpointSet(time.Now, nil, nil, func() (specs []*GRPCEndpointSpec) {
11611161
return []*GRPCEndpointSpec{
1162-
NewGRPCEndpointSpec(discoveredEndpointAddr[0], true),
1163-
NewGRPCEndpointSpec(discoveredEndpointAddr[1], false),
1164-
NewGRPCEndpointSpec(discoveredEndpointAddr[2], true),
1162+
NewGRPCEndpointSpec(discoveredEndpointAddr[0], true, testGRPCOpts...),
1163+
NewGRPCEndpointSpec(discoveredEndpointAddr[1], false, testGRPCOpts...),
1164+
NewGRPCEndpointSpec(discoveredEndpointAddr[2], true, testGRPCOpts...),
11651165
}
1166-
}, testGRPCOpts, time.Minute, 1*time.Second)
1166+
}, time.Minute, 1*time.Second)
11671167
defer endpointSet.Close()
11681168

11691169
// Initial update.
@@ -1273,7 +1273,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
12731273
endpointSpec: func() []*GRPCEndpointSpec {
12741274
endpointSpec := make([]*GRPCEndpointSpec, 0, len(endpoints.orderAddrs))
12751275
for _, addr := range endpoints.orderAddrs {
1276-
endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false))
1276+
endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false, testGRPCOpts...))
12771277
}
12781278
return endpointSpec
12791279
},
@@ -1297,7 +1297,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
12971297
name: "Sidecar discovered, no Ruler discovered",
12981298
endpointSpec: func() []*GRPCEndpointSpec {
12991299
return []*GRPCEndpointSpec{
1300-
NewGRPCEndpointSpec(endpoints.orderAddrs[0], false),
1300+
NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...),
13011301
}
13021302
},
13031303
expectedStores: 1, // sidecar
@@ -1310,8 +1310,8 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
13101310
name: "Ruler discovered",
13111311
endpointSpec: func() []*GRPCEndpointSpec {
13121312
return []*GRPCEndpointSpec{
1313-
NewGRPCEndpointSpec(endpoints.orderAddrs[0], false),
1314-
NewGRPCEndpointSpec(endpoints.orderAddrs[1], false),
1313+
NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...),
1314+
NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...),
13151315
}
13161316
},
13171317
expectedStores: 2, // sidecar + ruler
@@ -1324,7 +1324,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
13241324
name: "Sidecar removed",
13251325
endpointSpec: func() []*GRPCEndpointSpec {
13261326
return []*GRPCEndpointSpec{
1327-
NewGRPCEndpointSpec(endpoints.orderAddrs[1], false),
1327+
NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...),
13281328
}
13291329
},
13301330
expectedStores: 1, // ruler
@@ -1344,7 +1344,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
13441344

13451345
return tc.states[currentState].endpointSpec()
13461346
},
1347-
testGRPCOpts, time.Minute, 2*time.Second)
1347+
time.Minute, 2*time.Second)
13481348

13491349
defer endpointSet.Close()
13501350

@@ -1532,11 +1532,11 @@ func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc,
15321532
endpointSet := NewEndpointSet(now, nil, nil,
15331533
func() (specs []*GRPCEndpointSpec) {
15341534
for _, addr := range discoveredEndpointAddr {
1535-
specs = append(specs, NewGRPCEndpointSpec(addr, strict))
1535+
specs = append(specs, NewGRPCEndpointSpec(addr, strict, testGRPCOpts...))
15361536
}
15371537
return specs
15381538
},
1539-
testGRPCOpts, time.Minute, time.Second, metricLabels...)
1539+
time.Minute, time.Second, metricLabels...)
15401540
return endpointSet
15411541
}
15421542

‎pkg/receive/handler_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func newTestHandlerHashring(
232232

233233
ag = addrGen{}
234234
logger = logging.NewLogger("debug", "logfmt", "receive_test")
235-
limiter, _ = NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger(), 1*time.Second)
235+
limiter, _ = NewLimiter(extkingpin.NewNopConfig(), nil, RouterIngestor, log.NewNopLogger(), 1*time.Second)
236236
)
237237
for i := range appendables {
238238
h := NewHandler(logger, &Options{

‎pkg/receive/limiter.go

-19
Original file line numberDiff line numberDiff line change
@@ -210,22 +210,3 @@ func ParseLimitConfigContent(limitsConfig fileContent) (*RootLimitsConfig, error
210210
}
211211
return parsedConfig, nil
212212
}
213-
214-
type nopConfigContent struct{}
215-
216-
var _ fileContent = (*nopConfigContent)(nil)
217-
218-
// Content returns no content and no error.
219-
func (n nopConfigContent) Content() ([]byte, error) {
220-
return nil, nil
221-
}
222-
223-
// Path returns an empty path.
224-
func (n nopConfigContent) Path() string {
225-
return ""
226-
}
227-
228-
// NewNopConfig creates a no-op config content (no configuration).
229-
func NewNopConfig() nopConfigContent {
230-
return nopConfigContent{}
231-
}

‎test/e2e/e2ethanos/services.go

+16-13
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ import (
1717
e2edb "github.com/efficientgo/e2e/db"
1818
e2eobs "github.com/efficientgo/e2e/observable"
1919
"github.com/pkg/errors"
20-
"github.com/prometheus/common/model"
2120
"github.com/prometheus/prometheus/config"
22-
"github.com/prometheus/prometheus/discovery/targetgroup"
2321
"github.com/prometheus/prometheus/model/relabel"
2422
"gopkg.in/yaml.v2"
2523

@@ -463,26 +461,25 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
463461
"--store.sd-dns-interval": "5s",
464462
"--log.level": infoLogLevel,
465463
"--query.max-concurrent": "1",
466-
"--store.sd-interval": "5s",
467464
})
468465

469466
for _, repl := range q.replicaLabels {
470467
args = append(args, "--query.replica-label="+repl)
471468
}
472469
for _, addr := range q.storeAddresses {
473-
args = append(args, "--store="+addr)
470+
args = append(args, "--endpoint="+addr)
474471
}
475472
for _, addr := range q.ruleAddresses {
476-
args = append(args, "--rule="+addr)
473+
args = append(args, "--endpoint="+addr)
477474
}
478475
for _, addr := range q.targetAddresses {
479-
args = append(args, "--target="+addr)
476+
args = append(args, "--endpoint="+addr)
480477
}
481478
for _, addr := range q.metadataAddresses {
482-
args = append(args, "--metadata="+addr)
479+
args = append(args, "--endpoint="+addr)
483480
}
484481
for _, addr := range q.exemplarAddresses {
485-
args = append(args, "--exemplar="+addr)
482+
args = append(args, "--endpoint="+addr)
486483
}
487484
for _, feature := range q.enableFeatures {
488485
args = append(args, "--enable-feature="+feature)
@@ -504,21 +501,27 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
504501
return nil, errors.Wrap(err, "create query dir failed")
505502
}
506503

507-
fileSD := []*targetgroup.Group{{}}
504+
type EndpointSpec struct{ Address string }
505+
506+
endpoints := make([]EndpointSpec, 0)
508507
for _, a := range q.fileSDStoreAddresses {
509-
fileSD[0].Targets = append(fileSD[0].Targets, model.LabelSet{model.AddressLabel: model.LabelValue(a)})
508+
endpoints = append(endpoints, EndpointSpec{Address: a})
510509
}
511510

512-
b, err := yaml.Marshal(fileSD)
511+
endpointSDConfig := struct {
512+
Endpoints []EndpointSpec `yaml:"endpoints"`
513+
}{Endpoints: endpoints}
514+
b, err := yaml.Marshal(endpointSDConfig)
513515
if err != nil {
514516
return nil, err
515517
}
516518

517-
if err := os.WriteFile(q.Dir()+"/filesd.yaml", b, 0600); err != nil {
519+
if err := os.WriteFile(q.Dir()+"/endpoint-sd-config.yaml", b, 0600); err != nil {
518520
return nil, errors.Wrap(err, "creating query SD config failed")
519521
}
520522

521-
args = append(args, "--store.sd-files="+filepath.Join(q.InternalDir(), "filesd.yaml"))
523+
args = append(args, "--endpoint.sd-config-file="+filepath.Join(q.InternalDir(), "endpoint-sd-config.yaml"))
524+
args = append(args, "--endpoint.sd-config-reload-interval=5s")
522525
}
523526
if q.routePrefix != "" {
524527
args = append(args, "--web.route-prefix="+q.routePrefix)

‎test/e2e/exemplars_api_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ config:
8282
t.Cleanup(cancel)
8383

8484
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))
85-
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_query_exemplar_apis_dns_provider_results"}, e2emon.WaitMissingMetrics()))
85+
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_query_endpoints_dns_provider_results"}, e2emon.WaitMissingMetrics()))
8686

8787
now := time.Now()
8888
start := timestamp.FromTime(now.Add(-time.Hour))

‎test/e2e/metadata_api_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestMetadataAPI_Fanout(t *testing.T) {
5656
t.Cleanup(cancel)
5757

5858
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))
59-
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_query_metadata_apis_dns_provider_results"}, e2emon.WaitMissingMetrics()))
59+
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_query_endpoints_dns_provider_results"}, e2emon.WaitMissingMetrics()))
6060

6161
var promMeta map[string][]metadatapb.Meta
6262
// Wait metadata response to be ready as Prometheus gets metadata after scrape.

0 commit comments

Comments
 (0)
Please sign in to comment.