Skip to content

Commit 74d33df

Browse files
authored
Fix sorted queries do not produce sorted results for shardable queries (#5148)
1 parent e704858 commit 74d33df

File tree

10 files changed

+111
-21
lines changed

10 files changed

+111
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* [BUGFIX] Ingester: Ingesters returning empty response for metadata APIs. #5081
3939
* [BUGFIX] Ingester: Fix panic when querying metadata from blocks that are being deleted. #5119
4040
* [BUGFIX] Ring: Fix case when dynamodb kv reaches the limit of 25 actions per batch call. #5136
41+
* [BUGFIX] Query-frontend: Fix sorted queries do not produce sorted results for shardable queries. #5148
4142
* [FEATURE] Alertmanager: Add support for time_intervals. #5102
4243

4344
## 1.14.0 2022-12-02

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"github.com/weaveworks/common/httpgrpc"
2323
"google.golang.org/grpc/status"
2424

25+
promqlparser "github.com/prometheus/prometheus/promql/parser"
26+
2527
"github.com/cortexproject/cortex/pkg/cortexpb"
2628
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2729
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
@@ -245,7 +247,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
245247
return &resp, nil
246248
}
247249

248-
func (instantQueryCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) {
250+
func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
249251
sp, _ := opentracing.StartSpanFromContext(ctx, "PrometheusInstantQueryResponse.MergeResponse")
250252
sp.SetTag("response_count", len(responses))
251253
defer sp.Finish()
@@ -265,11 +267,15 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, responses ...tripper
265267
// For now, we only shard queries that returns a vector.
266268
switch promResponses[0].Data.ResultType {
267269
case model.ValVector.String():
270+
v, err := vectorMerge(req, promResponses)
271+
if err != nil {
272+
return nil, err
273+
}
268274
data = PrometheusInstantQueryData{
269275
ResultType: model.ValVector.String(),
270276
Result: PrometheusInstantQueryResult{
271277
Result: &PrometheusInstantQueryResult_Vector{
272-
Vector: vectorMerge(promResponses),
278+
Vector: v,
273279
},
274280
},
275281
Stats: statsMerge(promResponses),
@@ -297,8 +303,12 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, responses ...tripper
297303
return res, nil
298304
}
299305

300-
func vectorMerge(resps []*PrometheusInstantQueryResponse) *Vector {
306+
func vectorMerge(req tripperware.Request, resps []*PrometheusInstantQueryResponse) (*Vector, error) {
301307
output := map[string]*Sample{}
308+
sortAsc, sortDesc, err := parseQueryForSort(req.GetQuery())
309+
if err != nil {
310+
return nil, err
311+
}
302312
buf := make([]byte, 0, 1024)
303313
for _, resp := range resps {
304314
if resp == nil {
@@ -327,22 +337,66 @@ func vectorMerge(resps []*PrometheusInstantQueryResponse) *Vector {
327337
if len(output) == 0 {
328338
return &Vector{
329339
Samples: make([]*Sample, 0),
330-
}
340+
}, nil
331341
}
332342

333-
keys := make([]string, 0, len(output))
334-
for key := range output {
335-
keys = append(keys, key)
343+
type pair struct {
344+
metric string
345+
s *Sample
346+
}
347+
348+
samples := make([]*pair, 0, len(output))
349+
for k, v := range output {
350+
samples = append(samples, &pair{
351+
metric: k,
352+
s: v,
353+
})
336354
}
337-
sort.Strings(keys)
338355

356+
sort.Slice(samples, func(i, j int) bool {
357+
// Order is determined by the sortFn in the query.
358+
if sortAsc {
359+
return samples[i].s.Sample.Value < samples[j].s.Sample.Value
360+
} else if sortDesc {
361+
return samples[i].s.Sample.Value > samples[j].s.Sample.Value
362+
} else {
363+
// Fallback on sorting by labels.
364+
return samples[i].metric < samples[j].metric
365+
}
366+
})
339367
result := &Vector{
340368
Samples: make([]*Sample, 0, len(output)),
341369
}
342-
for _, key := range keys {
343-
result.Samples = append(result.Samples, output[key])
370+
for _, p := range samples {
371+
result.Samples = append(result.Samples, p.s)
344372
}
345-
return result
373+
return result, nil
374+
}
375+
376+
func parseQueryForSort(q string) (bool, bool, error) {
377+
expr, err := promqlparser.ParseExpr(q)
378+
if err != nil {
379+
return false, false, err
380+
}
381+
var sortAsc bool = false
382+
var sortDesc bool = false
383+
done := errors.New("done")
384+
promqlparser.Inspect(expr, func(n promqlparser.Node, _ []promqlparser.Node) error {
385+
if n, ok := n.(*promqlparser.Call); ok {
386+
if n.Func != nil {
387+
if n.Func.Name == "sort" {
388+
sortAsc = true
389+
return done
390+
}
391+
if n.Func.Name == "sort_desc" {
392+
sortDesc = true
393+
return done
394+
}
395+
}
396+
}
397+
return nil
398+
})
399+
return sortAsc, sortDesc, nil
346400
}
347401

348402
func matrixMerge(resps []*PrometheusInstantQueryResponse) []tripperware.SampleStream {

pkg/querier/tripperware/instantquery/instant_query_test.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,34 +198,43 @@ func TestResponse(t *testing.T) {
198198
}
199199

200200
func TestMergeResponse(t *testing.T) {
201+
defaultReq := &PrometheusRequest{
202+
Query: "sum(up)",
203+
}
201204
for _, tc := range []struct {
202205
name string
206+
req tripperware.Request
203207
resps []string
204208
expectedResp string
205209
expectedErr error
206210
}{
207211
{
208212
name: "empty response",
213+
req: defaultReq,
209214
resps: []string{`{"status":"success","data":{"resultType":"vector","result":[]}}`},
210215
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[]}}`,
211216
},
212217
{
213218
name: "empty response with stats",
219+
req: defaultReq,
214220
resps: []string{`{"status":"success","data":{"resultType":"vector","result":[],"stats":{"samples":{"totalQueryableSamples":0,"totalQueryableSamplesPerStep":[]}}}}`},
215221
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[],"stats":{"samples":{"totalQueryableSamples":0,"totalQueryableSamplesPerStep":[]}}}}`,
216222
},
217223
{
218224
name: "single response",
225+
req: defaultReq,
219226
resps: []string{`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`},
220227
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`,
221228
},
222229
{
223230
name: "single response with stats",
231+
req: defaultReq,
224232
resps: []string{`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`},
225233
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
226234
},
227235
{
228236
name: "duplicated response",
237+
req: defaultReq,
229238
resps: []string{
230239
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`,
231240
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}]}}`,
@@ -234,6 +243,7 @@ func TestMergeResponse(t *testing.T) {
234243
},
235244
{
236245
name: "duplicated response with stats",
246+
req: defaultReq,
237247
resps: []string{
238248
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
239249
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
@@ -242,14 +252,34 @@ func TestMergeResponse(t *testing.T) {
242252
},
243253
{
244254
name: "merge two responses",
255+
req: defaultReq,
245256
resps: []string{
246257
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
247258
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]}]}}`,
248259
},
249260
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]},{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
250261
},
262+
{
263+
name: "merge two responses with sort",
264+
req: &PrometheusRequest{Query: "sort(up)"},
265+
resps: []string{
266+
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
267+
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
268+
},
269+
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]},{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
270+
},
271+
{
272+
name: "merge two responses with sort_desc",
273+
req: &PrometheusRequest{Query: "sort_desc(up)"},
274+
resps: []string{
275+
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
276+
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
277+
},
278+
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]},{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
279+
},
251280
{
252281
name: "merge two responses with stats",
282+
req: defaultReq,
253283
resps: []string{
254284
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
255285
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]}],"stats":{"samples":{"totalQueryableSamples":10,"totalQueryableSamplesPerStep":[[1,10]]}}}}`,
@@ -258,6 +288,7 @@ func TestMergeResponse(t *testing.T) {
258288
},
259289
{
260290
name: "responses don't contain vector, should return an error",
291+
req: defaultReq,
261292
resps: []string{
262293
`{"status":"success","data":{"resultType":"string","result":[1662682521.409,"foo"]}}`,
263294
`{"status":"success","data":{"resultType":"string","result":[1662682521.409,"foo"]}}`,
@@ -266,13 +297,15 @@ func TestMergeResponse(t *testing.T) {
266297
},
267298
{
268299
name: "single matrix response",
300+
req: defaultReq,
269301
resps: []string{
270302
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"up"},"values":[[1,"1"],[2,"2"]]}]}}`,
271303
},
272304
expectedResp: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"up"},"values":[[1,"1"],[2,"2"]]}]}}`,
273305
},
274306
{
275307
name: "multiple matrix responses without duplicated series",
308+
req: defaultReq,
276309
resps: []string{
277310
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"]]}]}}`,
278311
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"foo"},"values":[[3,"3"],[4,"4"]]}]}}`,
@@ -281,6 +314,7 @@ func TestMergeResponse(t *testing.T) {
281314
},
282315
{
283316
name: "multiple matrix responses with duplicated series, but not same samples",
317+
req: defaultReq,
284318
resps: []string{
285319
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"]]}]}}`,
286320
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[3,"3"]]}]}}`,
@@ -289,6 +323,7 @@ func TestMergeResponse(t *testing.T) {
289323
},
290324
{
291325
name: "multiple matrix responses with duplicated series and same samples",
326+
req: defaultReq,
292327
resps: []string{
293328
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"]]}]}}`,
294329
`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`,
@@ -308,7 +343,7 @@ func TestMergeResponse(t *testing.T) {
308343
require.NoError(t, err)
309344
resps = append(resps, dr)
310345
}
311-
resp, err := InstantQueryCodec.MergeResponse(context.Background(), resps...)
346+
resp, err := InstantQueryCodec.MergeResponse(context.Background(), tc.req, resps...)
312347
assert.Equal(t, err, tc.expectedErr)
313348
if err != nil {
314349
return

pkg/querier/tripperware/query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type Codec interface {
5050
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
5151
type Merger interface {
5252
// MergeResponse merges responses from multiple requests into a single Response
53-
MergeResponse(context.Context, ...Response) (Response, error)
53+
MergeResponse(context.Context, Request, ...Response) (Response, error)
5454
}
5555

5656
// Response represents a query range response.

pkg/querier/tripperware/queryrange/query_range.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func NewEmptyPrometheusResponse() *PrometheusResponse {
127127
}
128128
}
129129

130-
func (c prometheusCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) {
130+
func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Request, responses ...tripperware.Response) (tripperware.Response, error) {
131131
sp, _ := opentracing.StartSpanFromContext(ctx, "QueryRangeResponse.MergeResponse")
132132
sp.SetTag("response_count", len(responses))
133133
defer sp.Finish()

pkg/querier/tripperware/queryrange/query_range_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ func TestMergeAPIResponses(t *testing.T) {
652652
},
653653
}} {
654654
t.Run(tc.name, func(t *testing.T) {
655-
output, err := PrometheusCodec.MergeResponse(context.Background(), tc.input...)
655+
output, err := PrometheusCodec.MergeResponse(context.Background(), nil, tc.input...)
656656
require.NoError(t, err)
657657
require.Equal(t, tc.expected, output)
658658
})

pkg/querier/tripperware/queryrange/results_cache.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
407407
return nil, nil, err
408408
}
409409
if len(requests) == 0 {
410-
response, err := s.merger.MergeResponse(context.Background(), responses...)
410+
response, err := s.merger.MergeResponse(context.Background(), r, responses...)
411411
// No downstream requests so no need to write back to the cache.
412412
return response, nil, err
413413
}
@@ -469,7 +469,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
469469
if err != nil {
470470
return nil, nil, err
471471
}
472-
merged, err := s.merger.MergeResponse(ctx, accumulator.Response, currentRes)
472+
merged, err := s.merger.MergeResponse(ctx, r, accumulator.Response, currentRes)
473473
if err != nil {
474474
return nil, nil, err
475475
}
@@ -481,7 +481,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte
481481
return nil, nil, err
482482
}
483483

484-
response, err := s.merger.MergeResponse(ctx, responses...)
484+
response, err := s.merger.MergeResponse(ctx, r, responses...)
485485
return response, mergedExtents, err
486486
}
487487

pkg/querier/tripperware/queryrange/split_by_interval.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripper
6161
resps = append(resps, reqResp.Response)
6262
}
6363

64-
response, err := s.merger.MergeResponse(ctx, resps...)
64+
response, err := s.merger.MergeResponse(ctx, nil, resps...)
6565
if err != nil {
6666
return nil, err
6767
}

pkg/querier/tripperware/queryrange/split_by_interval_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func TestSplitQuery(t *testing.T) {
266266
}
267267

268268
func TestSplitByDay(t *testing.T) {
269-
mergedResponse, err := PrometheusCodec.MergeResponse(context.Background(), parsedResponse, parsedResponse)
269+
mergedResponse, err := PrometheusCodec.MergeResponse(context.Background(), nil, parsedResponse, parsedResponse)
270270
require.NoError(t, err)
271271

272272
mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse)

pkg/querier/tripperware/shard_by.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {
7979
resps = append(resps, reqResp.Response)
8080
}
8181

82-
return s.merger.MergeResponse(ctx, resps...)
82+
return s.merger.MergeResponse(ctx, r, resps...)
8383
}
8484

8585
func (s shardBy) shardQuery(l log.Logger, numShards int, r Request, analysis querysharding.QueryAnalysis) []Request {

0 commit comments

Comments
 (0)