Skip to content

Commit 6365787

Browse files
tomwilkiepracucci
andauthored
Add basic query stats collection & logging. (#3539)
* Add basic query stats collection & logging. Current collects & logs wall clock time and number of series in the queries. TODO: - collection number of samples - propagate & aggregate in the query frontend - add some metrics. Signed-off-by: Tom Wilkie <[email protected]> * Use time.Since() Signed-off-by: Tom Wilkie <[email protected]> * Make Stats a proto so we can propagate it over gRPC. Signed-off-by: Tom Wilkie <[email protected]> * Fixed series tracker Signed-off-by: Marco Pracucci <[email protected]> * Track number of samples too Signed-off-by: Marco Pracucci <[email protected]> * Propagate the stats via gRPC Only partially done, still need to merge & record the results in the query frontend. Signed-off-by: Tom Wilkie <[email protected]> * Plugged in stats tracker in the query-frontend Signed-off-by: Marco Pracucci <[email protected]> * Removed series tracking Signed-off-by: Marco Pracucci <[email protected]> * Fixed tests Signed-off-by: Marco Pracucci <[email protected]> * Fixed wall time tracking Signed-off-by: Marco Pracucci <[email protected]> * Added method and path to log Signed-off-by: Marco Pracucci <[email protected]> * Introduced a CLI flag to enable query stats Signed-off-by: Marco Pracucci <[email protected]> * Fixed TODOs and linter Signed-off-by: Marco Pracucci <[email protected]> * Updated doc and CHANGELOG Signed-off-by: Marco Pracucci <[email protected]> * Rolledback samples tracking Signed-off-by: Marco Pracucci <[email protected]> * Addressed easy comments Signed-off-by: Marco Pracucci <[email protected]> * Moved query stats reporter to frontend transport.Handler Signed-off-by: Marco Pracucci <[email protected]> * Renamed query stats log fields Signed-off-by: Marco Pracucci <[email protected]> * Improved log message Signed-off-by: Marco Pracucci <[email protected]> * Updated CHANGELOG Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent da4cf56 commit 6365787

29 files changed

+945
-106
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- limit for outgoing gRPC messages has changed from 2147483647 to 16777216 bytes
1010
- limit for incoming gRPC messages has changed from 4194304 to 104857600 bytes
1111
* [FEATURE] Distributor/Ingester: Provide ability to not overflow writes in the presence of a leaving or unhealthy ingester. This allows for more efficient ingester rolling restarts. #3305
12+
* [FEATURE] Query-frontend: introduced query statistics logged in the query-frontend when enabled via `-frontend.query-stats-enabled=true`. When enabled, the metric `cortex_query_seconds_total` is tracked, counting the sum of the wall time spent across all queriers while running queries (on a per-tenant basis). The metrics `cortex_request_duration_seconds` and `cortex_query_seconds_total` are different: the first one tracks the request duration (eg. HTTP request from the client), while the latter tracks the sum of the wall time on all queriers involved executing the query. #3539
1213
* [ENHANCEMENT] API: Add GZIP HTTP compression to the API responses. Compression can be enabled via `-api.response-compression-enabled`. #3536
1314
* [ENHANCEMENT] Added zone-awareness support on queries. When zone-awareness is enabled, queries will still succeed if all ingesters in a single zone will fail. #3414
1415
* [ENHANCEMENT] Blocks storage ingester: exported more TSDB-related metrics. #3412

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ pkg/ring/ring.pb.go: pkg/ring/ring.proto
8181
pkg/frontend/v1/frontendv1pb/frontend.pb.go: pkg/frontend/v1/frontendv1pb/frontend.proto
8282
pkg/frontend/v2/frontendv2pb/frontend.pb.go: pkg/frontend/v2/frontendv2pb/frontend.proto
8383
pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
84+
pkg/querier/stats/stats.pb.go: pkg/querier/stats/stats.proto
8485
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
8586
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
8687
pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto

development/tsdb-blocks-storage-s3/config/cortex.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ store_gateway:
118118
consul:
119119
host: consul:8500
120120

121+
frontend:
122+
query_stats_enabled: true
123+
121124
frontend_worker:
122125
frontend_address: "query-frontend:9007"
123126
match_max_concurrent: true

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -874,6 +874,12 @@ The `query_frontend_config` configures the Cortex query-frontend.
874874
# CLI flag: -frontend.max-body-size
875875
[max_body_size: <int> | default = 10485760]
876876
877+
# True to enable query statistics tracking. When enabled, a message with some
878+
# statistics is logged for every query. This configuration option must be set
879+
# both on query-frontend and querier.
880+
# CLI flag: -frontend.query-stats-enabled
881+
[query_stats_enabled: <boolean> | default = false]
882+
877883
# Maximum number of outstanding requests per tenant per frontend; requests
878884
# beyond this error with HTTP 429.
879885
# CLI flag: -querier.max-outstanding-requests-per-tenant

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,4 @@ Currently experimental features are:
6262
- Distributor: do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
6363
- Ingester: close idle TSDB and remove them from local disk (`-blocks-storage.tsdb.close-idle-tsdb-timeout`)
6464
- Tenant Deletion in Purger, for blocks storage.
65+
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)

integration/query_frontend_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"os"
1010
"path/filepath"
11+
"strconv"
1112
"sync"
1213
"testing"
1314
"time"
@@ -28,6 +29,7 @@ import (
2829
type queryFrontendTestConfig struct {
2930
testMissingMetricName bool
3031
querySchedulerEnabled bool
32+
queryStatsEnabled bool
3133
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
3234
}
3335

@@ -45,6 +47,21 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) {
4547
})
4648
}
4749

50+
func TestQueryFrontendWithBlocksStorageViaFlagsAndQueryStatsEnabled(t *testing.T) {
51+
runQueryFrontendTest(t, queryFrontendTestConfig{
52+
testMissingMetricName: false,
53+
queryStatsEnabled: true,
54+
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
55+
flags = BlocksStorageFlags()
56+
57+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
58+
require.NoError(t, s.StartAndWaitReady(minio))
59+
60+
return "", flags
61+
},
62+
})
63+
}
64+
4865
func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQueryScheduler(t *testing.T) {
4966
runQueryFrontendTest(t, queryFrontendTestConfig{
5067
testMissingMetricName: false,
@@ -60,6 +77,22 @@ func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQueryScheduler(t *testing.
6077
})
6178
}
6279

80+
func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQuerySchedulerAndQueryStatsEnabled(t *testing.T) {
81+
runQueryFrontendTest(t, queryFrontendTestConfig{
82+
testMissingMetricName: false,
83+
querySchedulerEnabled: true,
84+
queryStatsEnabled: true,
85+
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
86+
flags = BlocksStorageFlags()
87+
88+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
89+
require.NoError(t, s.StartAndWaitReady(minio))
90+
91+
return "", flags
92+
},
93+
})
94+
}
95+
6396
func TestQueryFrontendWithBlocksStorageViaConfigFile(t *testing.T) {
6497
runQueryFrontendTest(t, queryFrontendTestConfig{
6598
testMissingMetricName: false,
@@ -183,6 +216,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
183216
"-querier.split-queries-by-interval": "24h",
184217
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
185218
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
219+
"-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled),
186220
})
187221

188222
// Start the query-scheduler if enabled.
@@ -306,6 +340,16 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
306340
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
307341
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))
308342

343+
// Ensure query stats metrics are tracked only when enabled.
344+
if cfg.queryStatsEnabled {
345+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(
346+
e2e.Greater(0),
347+
[]string{"cortex_query_seconds_total"},
348+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"))))
349+
} else {
350+
require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total"))
351+
}
352+
309353
// Ensure no service-specific metrics prefix is used by the wrong service.
310354
assertServiceMetricsPrefixes(t, Distributor, distributor)
311355
assertServiceMetricsPrefixes(t, Ingester, ingester)

pkg/api/handlers.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cortexproject/cortex/pkg/chunk/purger"
3030
"github.com/cortexproject/cortex/pkg/distributor"
3131
"github.com/cortexproject/cortex/pkg/querier"
32+
"github.com/cortexproject/cortex/pkg/querier/stats"
3233
"github.com/cortexproject/cortex/pkg/util"
3334
)
3435

@@ -77,7 +78,7 @@ func (pc *IndexPageContent) GetContent() map[string]map[string]string {
7778
return result
7879
}
7980

80-
var indexPageTemplate = `
81+
var indexPageTemplate = `
8182
<!DOCTYPE html>
8283
<html>
8384
<head>
@@ -242,7 +243,10 @@ func NewQuerierHandler(
242243
router.Path(legacyPrefix + "/api/v1/metadata").Methods("GET").Handler(legacyPromRouter)
243244

244245
// Add a middleware to extract the trace context and add a header.
245-
return nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string {
246+
handler := nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string {
246247
return "internalQuerier"
247248
}))
249+
250+
// Track execution time.
251+
return stats.NewWallTimeMiddleware().Wrap(handler)
248252
}

pkg/cortex/modules.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
306306
}
307307

308308
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
309+
t.Cfg.Worker.QueryStatsEnabled = t.Cfg.Frontend.Handler.QueryStatsEnabled
309310
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util.Logger, prometheus.DefaultRegisterer)
310311
}
311312

@@ -520,7 +521,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
520521
// Wrap roundtripper into Tripperware.
521522
roundTripper = t.QueryFrontendTripperware(roundTripper)
522523

523-
handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util.Logger)
524+
handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util.Logger, prometheus.DefaultRegisterer)
524525
if t.Cfg.Frontend.CompressResponses {
525526
handler = gziphandler.GzipHandler(handler)
526527
}

pkg/frontend/frontend_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
248248
r.PathPrefix("/").Handler(middleware.Merge(
249249
middleware.AuthenticateUser,
250250
middleware.Tracer{},
251-
).Wrap(transport.NewHandler(config.Handler, rt, logger)))
251+
).Wrap(transport.NewHandler(config.Handler, rt, logger, nil)))
252252

253253
httpServer := http.Server{
254254
Handler: r,

pkg/frontend/transport/handler.go

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,19 @@ import (
88
"io"
99
"io/ioutil"
1010
"net/http"
11+
"net/url"
1112
"strings"
1213
"time"
1314

1415
"github.com/go-kit/kit/log"
1516
"github.com/go-kit/kit/log/level"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/prometheus/client_golang/prometheus/promauto"
1619
"github.com/weaveworks/common/httpgrpc"
1720
"github.com/weaveworks/common/httpgrpc/server"
1821

22+
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
23+
"github.com/cortexproject/cortex/pkg/tenant"
1924
"github.com/cortexproject/cortex/pkg/util"
2025
)
2126

@@ -34,11 +39,13 @@ var (
3439
type HandlerConfig struct {
3540
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
3641
MaxBodySize int64 `yaml:"max_body_size"`
42+
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
3743
}
3844

3945
func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
4046
f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.")
4147
f.Int64Var(&cfg.MaxBodySize, "frontend.max-body-size", 10*1024*1024, "Max body size for downstream prometheus.")
48+
f.BoolVar(&cfg.QueryStatsEnabled, "frontend.query-stats-enabled", false, "True to enable query statistics tracking. When enabled, a message with some statistics is logged for every query. This configuration option must be set both on query-frontend and querier.")
4249
}
4350

4451
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
@@ -47,18 +54,43 @@ type Handler struct {
4754
cfg HandlerConfig
4855
log log.Logger
4956
roundTripper http.RoundTripper
57+
58+
// Metrics.
59+
querySeconds *prometheus.CounterVec
5060
}
5161

5262
// New creates a new frontend handler.
53-
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger) http.Handler {
54-
return &Handler{
63+
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler {
64+
h := &Handler{
5565
cfg: cfg,
5666
log: log,
5767
roundTripper: roundTripper,
5868
}
69+
70+
if cfg.QueryStatsEnabled {
71+
h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
72+
Name: "cortex_query_seconds_total",
73+
Help: "Total amount of wall clock time spend processing queries.",
74+
}, []string{"user"})
75+
}
76+
77+
return h
5978
}
6079

6180
func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
81+
var (
82+
stats *querier_stats.Stats
83+
queryString url.Values
84+
)
85+
86+
// Initialise the stats in the context and make sure it's propagated
87+
// down the request chain.
88+
if f.cfg.QueryStatsEnabled {
89+
var ctx context.Context
90+
stats, ctx = querier_stats.ContextWithEmptyStats(r.Context())
91+
r = r.WithContext(ctx)
92+
}
93+
6294
defer func() {
6395
_ = r.Body.Close()
6496
}()
@@ -86,38 +118,73 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
86118
// we don't check for copy error as there is no much we can do at this point
87119
_, _ = io.Copy(w, resp.Body)
88120

89-
f.reportSlowQuery(queryResponseTime, r, buf)
90-
}
121+
// Check whether we should parse the query string.
122+
shouldReportSlowQuery := f.cfg.LogQueriesLongerThan > 0 && queryResponseTime > f.cfg.LogQueriesLongerThan
123+
if shouldReportSlowQuery || f.cfg.QueryStatsEnabled {
124+
queryString = f.parseRequestQueryString(r, buf)
125+
}
91126

92-
// reportSlowQuery reports slow queries if LogQueriesLongerThan is set to <0, where 0 disables logging
93-
func (f *Handler) reportSlowQuery(queryResponseTime time.Duration, r *http.Request, bodyBuf bytes.Buffer) {
94-
if f.cfg.LogQueriesLongerThan == 0 || queryResponseTime <= f.cfg.LogQueriesLongerThan {
95-
return
127+
if shouldReportSlowQuery {
128+
f.reportSlowQuery(r, queryString, queryResponseTime)
96129
}
130+
if f.cfg.QueryStatsEnabled {
131+
f.reportQueryStats(r, queryString, queryResponseTime, stats)
132+
}
133+
}
97134

98-
logMessage := []interface{}{
135+
// reportSlowQuery reports slow queries.
136+
func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration) {
137+
logMessage := append([]interface{}{
99138
"msg", "slow query detected",
100139
"method", r.Method,
101140
"host", r.Host,
102141
"path", r.URL.Path,
103142
"time_taken", queryResponseTime.String(),
143+
}, formatQueryString(queryString)...)
144+
145+
level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...)
146+
}
147+
148+
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) {
149+
userID, err := tenant.TenantID(r.Context())
150+
if err != nil {
151+
return
104152
}
105153

106-
// use previously buffered body
154+
// Track stats.
155+
f.querySeconds.WithLabelValues(userID).Add(float64(stats.LoadWallTime()))
156+
157+
// Log stats.
158+
logMessage := append([]interface{}{
159+
"msg", "query stats",
160+
"method", r.Method,
161+
"path", r.URL.Path,
162+
"response_time", queryResponseTime,
163+
"query_wall_time", stats.LoadWallTime(),
164+
}, formatQueryString(queryString)...)
165+
166+
level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...)
167+
}
168+
169+
func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values {
170+
// Use previously buffered body.
107171
r.Body = ioutil.NopCloser(&bodyBuf)
108172

109173
// Ensure the form has been parsed so all the parameters are present
110174
err := r.ParseForm()
111175
if err != nil {
112-
level.Warn(util.WithContext(r.Context(), f.log)).Log("msg", "unable to parse form for request", "err", err)
176+
level.Warn(util.WithContext(r.Context(), f.log)).Log("msg", "unable to parse request form", "err", err)
177+
return nil
113178
}
114179

115-
// Attempt to iterate through the Form to log any filled in values
116-
for k, v := range r.Form {
117-
logMessage = append(logMessage, fmt.Sprintf("param_%s", k), strings.Join(v, ","))
118-
}
180+
return r.Form
181+
}
119182

120-
level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...)
183+
func formatQueryString(queryString url.Values) (fields []interface{}) {
184+
for k, v := range queryString {
185+
fields = append(fields, fmt.Sprintf("param_%s", k), strings.Join(v, ","))
186+
}
187+
return fields
121188
}
122189

123190
func writeError(w http.ResponseWriter, err error) {

pkg/frontend/v1/frontend.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/weaveworks/common/httpgrpc"
1717

1818
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
19+
"github.com/cortexproject/cortex/pkg/querier/stats"
1920
"github.com/cortexproject/cortex/pkg/scheduler/queue"
2021
"github.com/cortexproject/cortex/pkg/tenant"
2122
"github.com/cortexproject/cortex/pkg/util/grpcutil"
@@ -185,7 +186,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
185186

186187
// Handle the stream sending & receiving on a goroutine so we can
187188
// monitoring the contexts in a select and cancel things appropriately.
188-
resps := make(chan *httpgrpc.HTTPResponse, 1)
189+
resps := make(chan *frontendv1pb.ClientToFrontend, 1)
189190
errs := make(chan error, 1)
190191
go func() {
191192
err = server.Send(&frontendv1pb.FrontendToClient{
@@ -203,7 +204,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
203204
return
204205
}
205206

206-
resps <- resp.HttpResponse
207+
resps <- resp
207208
}()
208209

209210
select {
@@ -219,9 +220,14 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
219220
req.err <- err
220221
return err
221222

222-
// Happy path: propagate the response.
223+
// Happy path: merge the stats and propagate the response.
223224
case resp := <-resps:
224-
req.response <- resp
225+
if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
226+
stats := stats.FromContext(req.originalCtx)
227+
stats.Merge(resp.Stats) // Safe if stats is nil.
228+
}
229+
230+
req.response <- resp.HttpResponse
225231
}
226232
}
227233
}

0 commit comments

Comments
 (0)