Skip to content

Commit 85fd6e7

Browse files
authored
Refactor on the Querier/QF tripperware (#4856)
* Tripperware Refactor Signed-off-by: Alan Protasio <[email protected]> * lint Signed-off-by: Alan Protasio <[email protected]> * Moving query_range to tripperware package Signed-off-by: Alan Protasio <[email protected]> * lint Signed-off-by: Alan Protasio <[email protected]> * fix makefile protos Signed-off-by: Alan Protasio <[email protected]> * running check protos Signed-off-by: Alan Protasio <[email protected]> * Moving some protos to the tripperware package Signed-off-by: Alan Protasio <[email protected]> * update makefile Signed-off-by: Alan Protasio <[email protected]> * Moving limits interface to the tripperware package Signed-off-by: Alan Protasio <[email protected]> Signed-off-by: Alan Protasio <[email protected]>
1 parent 48bc900 commit 85fd6e7

36 files changed

+2942
-2483
lines changed

Makefile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ pkg/ingester/wal.pb.go: pkg/ingester/wal.proto
9494
pkg/ring/ring.pb.go: pkg/ring/ring.proto
9595
pkg/frontend/v1/frontendv1pb/frontend.pb.go: pkg/frontend/v1/frontendv1pb/frontend.proto
9696
pkg/frontend/v2/frontendv2pb/frontend.pb.go: pkg/frontend/v2/frontendv2pb/frontend.proto
97-
pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
97+
pkg/querier/tripperware/queryrange/queryrange.pb.go: pkg/querier/tripperware/queryrange/queryrange.proto
98+
pkg/querier/tripperware/query.pb.go: pkg/querier/tripperware/query.proto
9899
pkg/querier/stats/stats.pb.go: pkg/querier/stats/stats.proto
99100
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
100101
pkg/ruler/rulespb/rules.pb.go: pkg/ruler/rulespb/rules.proto
@@ -200,7 +201,8 @@ lint:
200201
./pkg/scheduler/... \
201202
./pkg/frontend/... \
202203
./pkg/querier/tenantfederation/... \
203-
./pkg/querier/queryrange/...
204+
./pkg/querier/tripperware/... \
205+
./pkg/querier/tripperware/queryrange/...
204206

205207
# Ensure packages that no longer use a global logger don't reintroduce it
206208
faillint -paths "github.com/cortexproject/cortex/pkg/util/log.{Logger}" \

pkg/api/middlewares.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"github.com/weaveworks/common/middleware"
88

99
"github.com/cortexproject/cortex/pkg/chunk/purger"
10-
"github.com/cortexproject/cortex/pkg/querier/queryrange"
10+
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
1111
"github.com/cortexproject/cortex/pkg/tenant"
1212
util_log "github.com/cortexproject/cortex/pkg/util/log"
1313
)

pkg/cortex/cortex.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ import (
3838
"github.com/cortexproject/cortex/pkg/ingester"
3939
"github.com/cortexproject/cortex/pkg/ingester/client"
4040
"github.com/cortexproject/cortex/pkg/querier"
41-
"github.com/cortexproject/cortex/pkg/querier/queryrange"
4241
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
42+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
43+
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
4344
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
4445
"github.com/cortexproject/cortex/pkg/ring"
4546
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
@@ -303,7 +304,7 @@ type Cortex struct {
303304
QuerierQueryable prom_storage.SampleAndChunkQueryable
304305
ExemplarQueryable prom_storage.ExemplarQueryable
305306
QuerierEngine *promql.Engine
306-
QueryFrontendTripperware queryrange.Tripperware
307+
QueryFrontendTripperware tripperware.Tripperware
307308

308309
Ruler *ruler.Ruler
309310
RulerStorage rulestore.RuleStore

pkg/cortex/modules.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ import (
3232
"github.com/cortexproject/cortex/pkg/frontend/transport"
3333
"github.com/cortexproject/cortex/pkg/ingester"
3434
"github.com/cortexproject/cortex/pkg/querier"
35-
"github.com/cortexproject/cortex/pkg/querier/queryrange"
3635
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
36+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
37+
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
3738
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
3839
"github.com/cortexproject/cortex/pkg/ring"
3940
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
@@ -434,7 +435,7 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) {
434435
// initQueryFrontendTripperware instantiates the tripperware used by the query frontend
435436
// to optimize Prometheus query requests.
436437
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
437-
tripperware, cache, err := queryrange.NewTripperware(
438+
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
438439
t.Cfg.QueryRange,
439440
util_log.Logger,
440441
t.Overrides,
@@ -448,7 +449,12 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
448449
return nil, err
449450
}
450451

451-
t.QueryFrontendTripperware = tripperware
452+
t.QueryFrontendTripperware = tripperware.NewQueryTripperware(util_log.Logger,
453+
prometheus.DefaultRegisterer,
454+
t.Cfg.QueryRange.ForwardHeaders,
455+
queryRangeMiddlewares,
456+
queryrange.PrometheusCodec,
457+
)
452458

453459
return services.NewIdleService(nil, func(_ error) error {
454460
if cache != nil {

pkg/querier/queryrange/instrumentation.go renamed to pkg/querier/tripperware/instrumentation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package queryrange
1+
package tripperware
22

33
import (
44
"context"

pkg/querier/tripperware/limits.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package tripperware
2+
3+
import "time"
4+
5+
// Limits allows us to specify per-tenant runtime limits on the behavior of
6+
// the query handling code.
7+
type Limits interface {
8+
// MaxQueryLookback returns the max lookback period of queries.
9+
MaxQueryLookback(userID string) time.Duration
10+
11+
// MaxQueryLength returns the limit of the length (in time) of a query.
12+
MaxQueryLength(string) time.Duration
13+
14+
// MaxQueryParallelism returns the limit to the number of split queries the
15+
// frontend will process in parallel.
16+
MaxQueryParallelism(string) int
17+
18+
// MaxCacheFreshness returns the period after which results are cacheable,
19+
// to prevent caching of very recent results.
20+
MaxCacheFreshness(string) time.Duration
21+
}

pkg/querier/tripperware/query.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package tripperware
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"time"
8+
"unsafe"
9+
10+
"github.com/gogo/protobuf/proto"
11+
jsoniter "github.com/json-iterator/go"
12+
"github.com/opentracing/opentracing-go"
13+
"github.com/prometheus/common/model"
14+
15+
"github.com/cortexproject/cortex/pkg/cortexpb"
16+
)
17+
18+
// Codec is used to encode/decode query range requests and responses so they can be passed down to middlewares.
19+
type Codec interface {
20+
Merger
21+
// DecodeRequest decodes a Request from an http request.
22+
DecodeRequest(_ context.Context, request *http.Request, forwardHeaders []string) (Request, error)
23+
// DecodeResponse decodes a Response from an http response.
24+
// The original request is also passed as a parameter this is useful for implementation that needs the request
25+
// to merge result or build the result correctly.
26+
DecodeResponse(context.Context, *http.Response, Request) (Response, error)
27+
// EncodeRequest encodes a Request into an http request.
28+
EncodeRequest(context.Context, Request) (*http.Request, error)
29+
// EncodeResponse encodes a Response into an http response.
30+
EncodeResponse(context.Context, Response) (*http.Response, error)
31+
}
32+
33+
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
34+
type Merger interface {
35+
// MergeResponse merges responses from multiple requests into a single Response
36+
MergeResponse(...Response) (Response, error)
37+
}
38+
39+
// Response represents a query range response.
40+
type Response interface {
41+
proto.Message
42+
// HTTPHeaders returns the HTTP headers in the response.
43+
HTTPHeaders() map[string][]string
44+
}
45+
46+
// Request represents a query range request that can be process by middlewares.
47+
type Request interface {
48+
// GetStart returns the start timestamp of the request in milliseconds.
49+
GetStart() int64
50+
// GetEnd returns the end timestamp of the request in milliseconds.
51+
GetEnd() int64
52+
// GetStep returns the step of the request in milliseconds.
53+
GetStep() int64
54+
// GetQuery returns the query of the request.
55+
GetQuery() string
56+
// WithStartEnd clone the current request with different start and end timestamp.
57+
WithStartEnd(startTime int64, endTime int64) Request
58+
// WithQuery clone the current request with a different query.
59+
WithQuery(string) Request
60+
proto.Message
61+
// LogToSpan writes information about this request to an OpenTracing span
62+
LogToSpan(opentracing.Span)
63+
// GetStats returns the stats of the request.
64+
GetStats() string
65+
// WithStats clones the current `PrometheusRequest` with a new stats.
66+
WithStats(stats string) Request
67+
}
68+
69+
// UnmarshalJSON implements json.Unmarshaler.
70+
func (s *SampleStream) UnmarshalJSON(data []byte) error {
71+
var stream struct {
72+
Metric model.Metric `json:"metric"`
73+
Values []cortexpb.Sample `json:"values"`
74+
}
75+
if err := json.Unmarshal(data, &stream); err != nil {
76+
return err
77+
}
78+
s.Labels = cortexpb.FromMetricsToLabelAdapters(stream.Metric)
79+
s.Samples = stream.Values
80+
return nil
81+
}
82+
83+
// MarshalJSON implements json.Marshaler.
84+
func (s *SampleStream) MarshalJSON() ([]byte, error) {
85+
stream := struct {
86+
Metric model.Metric `json:"metric"`
87+
Values []cortexpb.Sample `json:"values"`
88+
}{
89+
Metric: cortexpb.FromLabelAdaptersToMetric(s.Labels),
90+
Values: s.Samples,
91+
}
92+
return json.Marshal(stream)
93+
}
94+
95+
func PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
96+
if !iter.ReadArray() {
97+
iter.ReportError("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", "expected [")
98+
return
99+
}
100+
101+
t := model.Time(iter.ReadFloat64() * float64(time.Second/time.Millisecond))
102+
103+
if !iter.ReadArray() {
104+
iter.ReportError("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", "expected ,")
105+
return
106+
}
107+
v := iter.ReadInt64()
108+
109+
if iter.ReadArray() {
110+
iter.ReportError("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", "expected ]")
111+
}
112+
113+
*(*PrometheusResponseQueryableSamplesStatsPerStep)(ptr) = PrometheusResponseQueryableSamplesStatsPerStep{
114+
TimestampMs: int64(t),
115+
Value: v,
116+
}
117+
}
118+
119+
func PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
120+
stats := (*PrometheusResponseQueryableSamplesStatsPerStep)(ptr)
121+
stream.WriteArrayStart()
122+
stream.WriteFloat64(float64(stats.TimestampMs) / float64(time.Second/time.Millisecond))
123+
stream.WriteMore()
124+
stream.WriteInt64(stats.Value)
125+
stream.WriteArrayEnd()
126+
}
127+
128+
func init() {
129+
jsoniter.RegisterTypeEncoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode, func(unsafe.Pointer) bool { return false })
130+
jsoniter.RegisterTypeDecoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode)
131+
}

0 commit comments

Comments
 (0)